Skip to main content

redis/
client.rs

1use std::time::Duration;
2
3#[cfg(feature = "aio")]
4use crate::aio::{AsyncPushSender, DefaultAsyncDNSResolver};
5#[cfg(feature = "token-based-authentication")]
6use crate::auth::StreamingCredentialsProvider;
7#[cfg(feature = "aio")]
8use crate::io::AsyncDNSResolver;
9use crate::{
10    connection::{Connection, ConnectionInfo, ConnectionLike, IntoConnectionInfo, connect},
11    types::{RedisResult, Value},
12};
13#[cfg(feature = "aio")]
14use std::pin::Pin;
15
16#[cfg(feature = "tls-rustls")]
17use crate::tls::{TlsCertificates, inner_build_with_tls};
18
19#[cfg(feature = "cache-aio")]
20use crate::caching::CacheConfig;
21#[cfg(all(
22    feature = "cache-aio",
23    any(feature = "connection-manager", feature = "cluster-async")
24))]
25use crate::caching::CacheManager;
26
27/// The client type.
28#[derive(Debug, Clone)]
29pub struct Client {
30    pub(crate) connection_info: ConnectionInfo,
31}
32
33/// The client acts as connector to the redis server.  By itself it does not
34/// do much other than providing a convenient way to fetch a connection from
35/// it.  In the future the plan is to provide a connection pool in the client.
36///
37/// When opening a client a URL in the following format should be used:
38///
39/// ```plain
40/// redis://host:port/db
41/// ```
42///
43/// Example usage::
44///
45/// ```rust,no_run
46/// let client = redis::Client::open("redis://127.0.0.1/").unwrap();
47/// let con = client.get_connection().unwrap();
48/// ```
49impl Client {
50    /// Connects to a redis server and returns a client.  This does not
51    /// actually open a connection yet but it does perform some basic
52    /// checks on the URL that might make the operation fail.
53    pub fn open<T: IntoConnectionInfo>(params: T) -> RedisResult<Client> {
54        Ok(Client {
55            connection_info: params.into_connection_info()?,
56        })
57    }
58
59    /// Instructs the client to actually connect to redis and returns a
60    /// connection object.  The connection object can be used to send
61    /// commands to the server.  This can fail with a variety of errors
62    /// (like unreachable host) so it's important that you handle those
63    /// errors.
64    pub fn get_connection(&self) -> RedisResult<Connection> {
65        connect(&self.connection_info, None)
66    }
67
68    /// Instructs the client to actually connect to redis with specified
69    /// timeout and returns a connection object.  The connection object
70    /// can be used to send commands to the server.  This can fail with
71    /// a variety of errors (like unreachable host) so it's important
72    /// that you handle those errors.
73    pub fn get_connection_with_timeout(&self, timeout: Duration) -> RedisResult<Connection> {
74        connect(&self.connection_info, Some(timeout))
75    }
76
77    /// Returns a reference of client connection info object.
78    pub fn get_connection_info(&self) -> &ConnectionInfo {
79        &self.connection_info
80    }
81
82    /// Constructs a new `Client` with parameters necessary to create a TLS connection.
83    ///
84    /// - `conn_info` - URL using the `rediss://` scheme.
85    /// - `tls_certs` - `TlsCertificates` structure containing:
86    ///     - `client_tls` - Optional `ClientTlsConfig` containing byte streams for
87    ///         - `client_cert` - client's byte stream containing client certificate in PEM format
88    ///         - `client_key` - client's byte stream containing private key in PEM format
89    ///     - `root_cert` - Optional byte stream yielding PEM formatted file for root certificates.
90    ///
91    /// If `ClientTlsConfig` ( cert+key pair ) is not provided, then client-side authentication is not enabled.
92    /// If `root_cert` is not provided, then system root certificates are used instead.
93    ///
94    /// # Examples
95    ///
96    /// ```no_run
97    /// use std::{fs::File, io::{BufReader, Read}};
98    ///
99    /// use redis::{Client, AsyncTypedCommands as _, TlsCertificates, ClientTlsConfig};
100    ///
101    /// async fn do_redis_code(
102    ///     url: &str,
103    ///     root_cert_file: &str,
104    ///     cert_file: &str,
105    ///     key_file: &str
106    /// ) -> redis::RedisResult<()> {
107    ///     let root_cert_file = File::open(root_cert_file).expect("cannot open private cert file");
108    ///     let mut root_cert_vec = Vec::new();
109    ///     BufReader::new(root_cert_file)
110    ///         .read_to_end(&mut root_cert_vec)
111    ///         .expect("Unable to read ROOT cert file");
112    ///
113    ///     let cert_file = File::open(cert_file).expect("cannot open private cert file");
114    ///     let mut client_cert_vec = Vec::new();
115    ///     BufReader::new(cert_file)
116    ///         .read_to_end(&mut client_cert_vec)
117    ///         .expect("Unable to read client cert file");
118    ///
119    ///     let key_file = File::open(key_file).expect("cannot open private key file");
120    ///     let mut client_key_vec = Vec::new();
121    ///     BufReader::new(key_file)
122    ///         .read_to_end(&mut client_key_vec)
123    ///         .expect("Unable to read client key file");
124    ///
125    ///     let client = Client::build_with_tls(
126    ///         url,
127    ///         TlsCertificates {
128    ///             client_tls: Some(ClientTlsConfig{
129    ///                 client_cert: client_cert_vec,
130    ///                 client_key: client_key_vec,
131    ///             }),
132    ///             root_cert: Some(root_cert_vec),
133    ///         }
134    ///     )
135    ///     .expect("Unable to build client");
136    ///
137    ///     let connection_info = client.get_connection_info();
138    ///
139    ///     println!(">>> connection info: {connection_info:?}");
140    ///
141    ///     let mut con = client.get_multiplexed_async_connection().await?;
142    ///
143    ///     con.set("key1", b"foo").await?;
144    ///
145    ///     redis::cmd("SET")
146    ///         .arg(&["key2", "bar"])
147    ///         .exec_async(&mut con)
148    ///         .await?;
149    ///
150    ///     let result = redis::cmd("MGET")
151    ///         .arg(&["key1", "key2"])
152    ///         .query_async(&mut con)
153    ///         .await;
154    ///     assert_eq!(result, Ok(("foo".to_string(), b"bar".to_vec())));
155    ///     println!("Result from MGET: {result:?}");
156    ///
157    ///     Ok(())
158    /// }
159    /// ```
160    #[cfg(feature = "tls-rustls")]
161    pub fn build_with_tls<C: IntoConnectionInfo>(
162        conn_info: C,
163        tls_certs: TlsCertificates,
164    ) -> RedisResult<Client> {
165        let connection_info = conn_info.into_connection_info()?;
166
167        inner_build_with_tls(connection_info, &tls_certs)
168    }
169}
170
171#[cfg(feature = "cache-aio")]
172#[derive(Clone)]
173pub(crate) enum Cache {
174    Config(CacheConfig),
175    #[cfg(any(feature = "connection-manager", feature = "cluster-async"))]
176    Manager(CacheManager),
177}
178
179#[cfg(feature = "aio")]
180pub(crate) const DEFAULT_RESPONSE_TIMEOUT: Option<Duration> = Some(Duration::from_millis(500));
181#[cfg(any(feature = "aio", feature = "cluster"))]
182pub(crate) const DEFAULT_CONNECTION_TIMEOUT: Option<Duration> = Some(Duration::from_secs(1));
183
184/// Options for creation of async connection
185#[cfg(feature = "aio")]
186#[derive(Clone)]
187pub struct AsyncConnectionConfig {
188    /// Maximum time to wait for a response from the server
189    pub(crate) response_timeout: Option<Duration>,
190    /// Maximum time to wait for a connection to be established
191    pub(crate) connection_timeout: Option<Duration>,
192    pub(crate) push_sender: Option<std::sync::Arc<dyn AsyncPushSender>>,
193    #[cfg(feature = "cache-aio")]
194    pub(crate) cache: Option<Cache>,
195    pub(crate) dns_resolver: Option<std::sync::Arc<dyn AsyncDNSResolver>>,
196    pub(crate) pipeline_buffer_size: Option<usize>,
197    /// Optional credentials provider for dynamic authentication (e.g., token-based authentication)
198    #[cfg(feature = "token-based-authentication")]
199    pub(crate) credentials_provider: Option<std::sync::Arc<dyn StreamingCredentialsProvider>>,
200}
201
202#[cfg(feature = "aio")]
203impl Default for AsyncConnectionConfig {
204    fn default() -> Self {
205        Self {
206            response_timeout: DEFAULT_RESPONSE_TIMEOUT,
207            connection_timeout: DEFAULT_CONNECTION_TIMEOUT,
208            push_sender: Default::default(),
209            #[cfg(feature = "cache-aio")]
210            cache: Default::default(),
211            dns_resolver: Default::default(),
212            pipeline_buffer_size: None,
213            #[cfg(feature = "token-based-authentication")]
214            credentials_provider: None,
215        }
216    }
217}
218
219#[cfg(feature = "aio")]
220impl AsyncConnectionConfig {
221    /// Creates a new instance of the options with nothing set
222    pub fn new() -> Self {
223        Self::default()
224    }
225
226    /// Each connection attempt to the server will time out after `connection_timeout`.
227    ///
228    /// Set `None` if you don't want the connection attempt to time out.
229    pub fn set_connection_timeout(mut self, connection_timeout: Option<Duration>) -> Self {
230        self.connection_timeout = connection_timeout;
231        self
232    }
233
234    /// The new connection will time out operations after `response_timeout` has passed.
235    ///
236    /// Set `None` if you don't want requests to time out.
237    pub fn set_response_timeout(mut self, response_timeout: Option<Duration>) -> Self {
238        self.response_timeout = response_timeout;
239        self
240    }
241
242    /// Sets sender sender for push values.
243    ///
244    /// The sender can be a channel, or an arbitrary function that handles [crate::PushInfo] values.
245    /// This will fail client creation if the connection isn't configured for RESP3 communications via the [crate::RedisConnectionInfo::set_protocol] function.
246    ///
247    /// # Examples
248    ///
249    /// ```rust
250    /// # use redis::AsyncConnectionConfig;
251    /// let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
252    /// let config = AsyncConnectionConfig::new().set_push_sender(tx);
253    /// ```
254    ///
255    /// ```rust
256    /// # use std::sync::{Mutex, Arc};
257    /// # use redis::AsyncConnectionConfig;
258    /// let messages = Arc::new(Mutex::new(Vec::new()));
259    /// let config = AsyncConnectionConfig::new().set_push_sender(move |msg|{
260    ///     let Ok(mut messages) = messages.lock() else {
261    ///         return Err(redis::aio::SendError);
262    ///     };
263    ///     messages.push(msg);
264    ///     Ok(())
265    /// });
266    /// ```
267    pub fn set_push_sender(self, sender: impl AsyncPushSender) -> Self {
268        self.set_push_sender_internal(std::sync::Arc::new(sender))
269    }
270
271    pub(crate) fn set_push_sender_internal(
272        mut self,
273        sender: std::sync::Arc<dyn AsyncPushSender>,
274    ) -> Self {
275        self.push_sender = Some(sender);
276        self
277    }
278
279    /// Sets cache config for MultiplexedConnection, check CacheConfig for more details.
280    #[cfg(feature = "cache-aio")]
281    pub fn set_cache_config(mut self, cache_config: CacheConfig) -> Self {
282        self.cache = Some(Cache::Config(cache_config));
283        self
284    }
285
286    #[cfg(all(
287        feature = "cache-aio",
288        any(feature = "connection-manager", feature = "cluster-async")
289    ))]
290    pub(crate) fn set_cache_manager(mut self, cache_manager: CacheManager) -> Self {
291        self.cache = Some(Cache::Manager(cache_manager));
292        self
293    }
294
295    /// Set the DNS resolver for the underlying TCP connection.
296    ///
297    /// The parameter resolver must implement the [`crate::io::AsyncDNSResolver`] trait.
298    pub fn set_dns_resolver(self, dns_resolver: impl AsyncDNSResolver) -> Self {
299        self.set_dns_resolver_internal(std::sync::Arc::new(dns_resolver))
300    }
301
302    pub(super) fn set_dns_resolver_internal(
303        mut self,
304        dns_resolver: std::sync::Arc<dyn AsyncDNSResolver>,
305    ) -> Self {
306        self.dns_resolver = Some(dns_resolver);
307        self
308    }
309
310    /// Sets the buffer size for the internal pipeline channel.
311    ///
312    /// The multiplexed connection uses an internal channel to queue Redis commands
313    /// before sending them to the server. This setting controls how many commands
314    /// can be buffered in that channel.
315    ///
316    /// When the buffer is full, callers will asynchronously wait until space becomes
317    /// available. A larger buffer allows more commands to be queued during bursts of
318    /// activity, reducing wait time for callers. However, this comes at the cost of
319    /// increased memory usage.
320    ///
321    /// The default value is 50. Consider increasing this value for high-concurrency
322    /// scenarios (e.g., web servers handling many simultaneous requests) where
323    /// buffer contention may increase overall latency and cause upstream timeouts.
324    pub fn set_pipeline_buffer_size(mut self, size: usize) -> Self {
325        self.pipeline_buffer_size = Some(size);
326        self
327    }
328
329    /// Sets a credentials provider for dynamic authentication (e.g., token-based authentication).
330    ///
331    /// This is useful for authentication mechanisms that require periodic credential refresh,
332    /// such as Microsoft Entra ID (formerly Azure AD).
333    ///
334    /// # Example
335    ///
336    /// ```rust,no_run
337    /// # #[cfg(feature = "entra-id")]
338    /// # {
339    /// use redis::{AsyncConnectionConfig, EntraIdCredentialsProvider, RetryConfig};
340    ///
341    /// # async fn example() -> redis::RedisResult<()> {
342    /// let mut provider = EntraIdCredentialsProvider::new_developer_tools()?;
343    /// provider.start(RetryConfig::default());
344    ///
345    /// let config = AsyncConnectionConfig::new()
346    ///     .set_credentials_provider(provider);
347    /// # Ok(())
348    /// # }
349    /// # }
350    /// ```
351    #[cfg(feature = "token-based-authentication")]
352    pub fn set_credentials_provider<P>(mut self, provider: P) -> Self
353    where
354        P: StreamingCredentialsProvider + 'static,
355    {
356        self.credentials_provider = Some(std::sync::Arc::new(provider));
357        self
358    }
359}
360
361/// To enable async support you need to chose one of the supported runtimes and active its
362/// corresponding feature: `tokio-comp` or `smol-comp`
363#[cfg(feature = "aio")]
364#[cfg_attr(docsrs, doc(cfg(feature = "aio")))]
365impl Client {
366    /// Returns an async connection from the client.
367    #[cfg(feature = "aio")]
368    #[cfg_attr(docsrs, doc(cfg(feature = "aio")))]
369    pub async fn get_multiplexed_async_connection(
370        &self,
371    ) -> RedisResult<crate::aio::MultiplexedConnection> {
372        self.get_multiplexed_async_connection_with_config(&AsyncConnectionConfig::new())
373            .await
374    }
375
376    /// Returns an async connection from the client.
377    #[cfg(feature = "aio")]
378    #[cfg_attr(docsrs, doc(cfg(feature = "aio")))]
379    pub async fn get_multiplexed_async_connection_with_config(
380        &self,
381        config: &AsyncConnectionConfig,
382    ) -> RedisResult<crate::aio::MultiplexedConnection> {
383        match Runtime::locate() {
384            #[cfg(feature = "tokio-comp")]
385            rt @ Runtime::Tokio => self
386                .get_multiplexed_async_connection_inner_with_timeout::<crate::aio::tokio::Tokio>(
387                    config, rt,
388                )
389                .await,
390
391            #[cfg(feature = "smol-comp")]
392            rt @ Runtime::Smol => {
393                self.get_multiplexed_async_connection_inner_with_timeout::<crate::aio::smol::Smol>(
394                    config, rt,
395                )
396                .await
397            }
398        }
399    }
400
401    /// Returns an async [`ConnectionManager`][connection-manager] from the client.
402    ///
403    /// The connection manager wraps a
404    /// [`MultiplexedConnection`][multiplexed-connection]. If a command to that
405    /// connection fails with a connection error, then a new connection is
406    /// established in the background and the error is returned to the caller.
407    ///
408    /// This means that on connection loss at least one command will fail, but
409    /// the connection will be re-established automatically if possible. Please
410    /// refer to the [`ConnectionManager`][connection-manager] docs for
411    /// detailed reconnecting behavior.
412    ///
413    /// A connection manager can be cloned, allowing requests to be sent concurrently
414    /// on the same underlying connection (tcp/unix socket).
415    ///
416    /// [connection-manager]: aio/struct.ConnectionManager.html
417    /// [multiplexed-connection]: aio/struct.MultiplexedConnection.html
418    #[cfg(feature = "connection-manager")]
419    #[cfg_attr(docsrs, doc(cfg(feature = "connection-manager")))]
420    pub async fn get_connection_manager(&self) -> RedisResult<crate::aio::ConnectionManager> {
421        crate::aio::ConnectionManager::new(self.clone()).await
422    }
423
424    /// Returns an async [`ConnectionManager`][connection-manager] from the client.
425    ///
426    /// The connection manager wraps a
427    /// [`MultiplexedConnection`][multiplexed-connection]. If a command to that
428    /// connection fails with a connection error, then a new connection is
429    /// established in the background and the error is returned to the caller.
430    ///
431    /// This means that on connection loss at least one command will fail, but
432    /// the connection will be re-established automatically if possible. Please
433    /// refer to the [`ConnectionManager`][connection-manager] docs for
434    /// detailed reconnecting behavior.
435    ///
436    /// A connection manager can be cloned, allowing requests to be sent concurrently
437    /// on the same underlying connection (tcp/unix socket).
438    ///
439    /// [connection-manager]: aio/struct.ConnectionManager.html
440    /// [multiplexed-connection]: aio/struct.MultiplexedConnection.html
441    #[cfg(feature = "connection-manager")]
442    #[cfg_attr(docsrs, doc(cfg(feature = "connection-manager")))]
443    pub async fn get_connection_manager_with_config(
444        &self,
445        config: crate::aio::ConnectionManagerConfig,
446    ) -> RedisResult<crate::aio::ConnectionManager> {
447        crate::aio::ConnectionManager::new_with_config(self.clone(), config).await
448    }
449
450    async fn get_multiplexed_async_connection_inner_with_timeout<T>(
451        &self,
452        config: &AsyncConnectionConfig,
453        rt: Runtime,
454    ) -> RedisResult<crate::aio::MultiplexedConnection>
455    where
456        T: crate::aio::RedisRuntime,
457    {
458        let result = if let Some(connection_timeout) = config.connection_timeout {
459            rt.timeout(
460                connection_timeout,
461                self.get_multiplexed_async_connection_inner::<T>(config),
462            )
463            .await
464        } else {
465            Ok(self
466                .get_multiplexed_async_connection_inner::<T>(config)
467                .await)
468        };
469
470        match result {
471            Ok(Ok(connection)) => Ok(connection),
472            Ok(Err(e)) => Err(e),
473            Err(elapsed) => Err(elapsed.into()),
474        }
475    }
476
477    async fn get_multiplexed_async_connection_inner<T>(
478        &self,
479        config: &AsyncConnectionConfig,
480    ) -> RedisResult<crate::aio::MultiplexedConnection>
481    where
482        T: crate::aio::RedisRuntime,
483    {
484        let (mut connection, driver) = self
485            .create_multiplexed_async_connection_inner::<T>(config)
486            .await?;
487        let handle = T::spawn(driver);
488        connection.set_task_handle(handle);
489        Ok(connection)
490    }
491
492    async fn create_multiplexed_async_connection_inner<T>(
493        &self,
494        config: &AsyncConnectionConfig,
495    ) -> RedisResult<(
496        crate::aio::MultiplexedConnection,
497        impl std::future::Future<Output = ()> + 'static,
498    )>
499    where
500        T: crate::aio::RedisRuntime,
501    {
502        let resolver = config
503            .dns_resolver
504            .as_deref()
505            .unwrap_or(&DefaultAsyncDNSResolver);
506        let con = self.get_simple_async_connection::<T>(resolver).await?;
507        crate::aio::MultiplexedConnection::new_with_config(
508            &self.connection_info.redis,
509            con,
510            config.clone(),
511        )
512        .await
513    }
514
515    async fn get_simple_async_connection_dynamically(
516        &self,
517        dns_resolver: &dyn AsyncDNSResolver,
518    ) -> RedisResult<Pin<Box<dyn crate::aio::AsyncStream + Send + Sync>>> {
519        match Runtime::locate() {
520            #[cfg(feature = "tokio-comp")]
521            Runtime::Tokio => {
522                self.get_simple_async_connection::<crate::aio::tokio::Tokio>(dns_resolver)
523                    .await
524            }
525
526            #[cfg(feature = "smol-comp")]
527            Runtime::Smol => {
528                self.get_simple_async_connection::<crate::aio::smol::Smol>(dns_resolver)
529                    .await
530            }
531        }
532    }
533
534    async fn get_simple_async_connection<T>(
535        &self,
536        dns_resolver: &dyn AsyncDNSResolver,
537    ) -> RedisResult<Pin<Box<dyn crate::aio::AsyncStream + Send + Sync>>>
538    where
539        T: crate::aio::RedisRuntime,
540    {
541        Ok(
542            crate::aio::connect_simple::<T>(&self.connection_info, dns_resolver)
543                .await?
544                .boxed(),
545        )
546    }
547
548    #[cfg(feature = "connection-manager")]
549    pub(crate) fn connection_info(&self) -> &ConnectionInfo {
550        &self.connection_info
551    }
552
553    /// Returns an async receiver for pub-sub messages.
554    #[cfg(feature = "aio")]
555    // TODO - do we want to type-erase pubsub using a trait, to allow us to replace it with a different implementation later?
556    pub async fn get_async_pubsub(&self) -> RedisResult<crate::aio::PubSub> {
557        let connection = self
558            .get_simple_async_connection_dynamically(&DefaultAsyncDNSResolver)
559            .await?;
560
561        crate::aio::PubSub::new(&self.connection_info.redis, connection).await
562    }
563
564    /// Returns an async receiver for monitor messages.
565    #[cfg(feature = "aio")]
566    pub async fn get_async_monitor(&self) -> RedisResult<crate::aio::Monitor> {
567        let connection = self
568            .get_simple_async_connection_dynamically(&DefaultAsyncDNSResolver)
569            .await?;
570        crate::aio::Monitor::new(&self.connection_info.redis, connection).await
571    }
572}
573
574#[cfg(feature = "aio")]
575use crate::aio::Runtime;
576
577impl ConnectionLike for Client {
578    fn req_packed_command(&mut self, cmd: &[u8]) -> RedisResult<Value> {
579        self.get_connection()?.req_packed_command(cmd)
580    }
581
582    fn req_packed_commands(
583        &mut self,
584        cmd: &[u8],
585        offset: usize,
586        count: usize,
587    ) -> RedisResult<Vec<Value>> {
588        self.get_connection()?
589            .req_packed_commands(cmd, offset, count)
590    }
591
592    fn get_db(&self) -> i64 {
593        self.connection_info.redis.db
594    }
595
596    fn check_connection(&mut self) -> bool {
597        if let Ok(mut conn) = self.get_connection() {
598            conn.check_connection()
599        } else {
600            false
601        }
602    }
603
604    fn is_open(&self) -> bool {
605        if let Ok(conn) = self.get_connection() {
606            conn.is_open()
607        } else {
608            false
609        }
610    }
611}
612
613#[cfg(test)]
614mod test {
615    use super::*;
616    use assert_matches::assert_matches;
617
618    #[test]
619    fn regression_293_parse_ipv6_with_interface() {
620        assert_matches!(Client::open(("fe80::cafe:beef%eno1", 6379)), Ok(_));
621    }
622
623    #[cfg(feature = "aio")]
624    #[test]
625    fn test_async_connection_config_pipeline_buffer_size_default() {
626        let config = AsyncConnectionConfig::new();
627        assert_eq!(config.pipeline_buffer_size, None);
628    }
629
630    #[cfg(feature = "aio")]
631    #[test]
632    fn test_async_connection_config_pipeline_buffer_size_custom() {
633        let config = AsyncConnectionConfig::new().set_pipeline_buffer_size(100);
634        assert_eq!(config.pipeline_buffer_size, Some(100));
635    }
636}