redis/
client.rs

1use std::time::Duration;
2
3#[cfg(feature = "aio")]
4use crate::aio::{AsyncPushSender, DefaultAsyncDNSResolver};
5#[cfg(feature = "aio")]
6use crate::io::{tcp::TcpSettings, AsyncDNSResolver};
7use crate::{
8    connection::{connect, Connection, ConnectionInfo, ConnectionLike, IntoConnectionInfo},
9    types::{RedisResult, Value},
10};
11#[cfg(feature = "aio")]
12use std::pin::Pin;
13
14#[cfg(feature = "tls-rustls")]
15use crate::tls::{inner_build_with_tls, TlsCertificates};
16
17#[cfg(feature = "cache-aio")]
18use crate::caching::CacheConfig;
19#[cfg(all(feature = "cache-aio", feature = "connection-manager"))]
20use crate::caching::CacheManager;
21
22/// The client type.
23#[derive(Debug, Clone)]
24pub struct Client {
25    pub(crate) connection_info: ConnectionInfo,
26}
27
28/// The client acts as connector to the redis server.  By itself it does not
29/// do much other than providing a convenient way to fetch a connection from
30/// it.  In the future the plan is to provide a connection pool in the client.
31///
32/// When opening a client a URL in the following format should be used:
33///
34/// ```plain
35/// redis://host:port/db
36/// ```
37///
38/// Example usage::
39///
40/// ```rust,no_run
41/// let client = redis::Client::open("redis://127.0.0.1/").unwrap();
42/// let con = client.get_connection().unwrap();
43/// ```
44impl Client {
45    /// Connects to a redis server and returns a client.  This does not
46    /// actually open a connection yet but it does perform some basic
47    /// checks on the URL that might make the operation fail.
48    pub fn open<T: IntoConnectionInfo>(params: T) -> RedisResult<Client> {
49        Ok(Client {
50            connection_info: params.into_connection_info()?,
51        })
52    }
53
54    /// Instructs the client to actually connect to redis and returns a
55    /// connection object.  The connection object can be used to send
56    /// commands to the server.  This can fail with a variety of errors
57    /// (like unreachable host) so it's important that you handle those
58    /// errors.
59    pub fn get_connection(&self) -> RedisResult<Connection> {
60        connect(&self.connection_info, None)
61    }
62
63    /// Instructs the client to actually connect to redis with specified
64    /// timeout and returns a connection object.  The connection object
65    /// can be used to send commands to the server.  This can fail with
66    /// a variety of errors (like unreachable host) so it's important
67    /// that you handle those errors.
68    pub fn get_connection_with_timeout(&self, timeout: Duration) -> RedisResult<Connection> {
69        connect(&self.connection_info, Some(timeout))
70    }
71
72    /// Returns a reference of client connection info object.
73    pub fn get_connection_info(&self) -> &ConnectionInfo {
74        &self.connection_info
75    }
76
77    /// Constructs a new `Client` with parameters necessary to create a TLS connection.
78    ///
79    /// - `conn_info` - URL using the `rediss://` scheme.
80    /// - `tls_certs` - `TlsCertificates` structure containing:
81    ///     - `client_tls` - Optional `ClientTlsConfig` containing byte streams for
82    ///         - `client_cert` - client's byte stream containing client certificate in PEM format
83    ///         - `client_key` - client's byte stream containing private key in PEM format
84    ///     - `root_cert` - Optional byte stream yielding PEM formatted file for root certificates.
85    ///
86    /// If `ClientTlsConfig` ( cert+key pair ) is not provided, then client-side authentication is not enabled.
87    /// If `root_cert` is not provided, then system root certificates are used instead.
88    ///
89    /// # Examples
90    ///
91    /// ```no_run
92    /// use std::{fs::File, io::{BufReader, Read}};
93    ///
94    /// use redis::{Client, AsyncCommands as _, TlsCertificates, ClientTlsConfig};
95    ///
96    /// async fn do_redis_code(
97    ///     url: &str,
98    ///     root_cert_file: &str,
99    ///     cert_file: &str,
100    ///     key_file: &str
101    /// ) -> redis::RedisResult<()> {
102    ///     let root_cert_file = File::open(root_cert_file).expect("cannot open private cert file");
103    ///     let mut root_cert_vec = Vec::new();
104    ///     BufReader::new(root_cert_file)
105    ///         .read_to_end(&mut root_cert_vec)
106    ///         .expect("Unable to read ROOT cert file");
107    ///
108    ///     let cert_file = File::open(cert_file).expect("cannot open private cert file");
109    ///     let mut client_cert_vec = Vec::new();
110    ///     BufReader::new(cert_file)
111    ///         .read_to_end(&mut client_cert_vec)
112    ///         .expect("Unable to read client cert file");
113    ///
114    ///     let key_file = File::open(key_file).expect("cannot open private key file");
115    ///     let mut client_key_vec = Vec::new();
116    ///     BufReader::new(key_file)
117    ///         .read_to_end(&mut client_key_vec)
118    ///         .expect("Unable to read client key file");
119    ///
120    ///     let client = Client::build_with_tls(
121    ///         url,
122    ///         TlsCertificates {
123    ///             client_tls: Some(ClientTlsConfig{
124    ///                 client_cert: client_cert_vec,
125    ///                 client_key: client_key_vec,
126    ///             }),
127    ///             root_cert: Some(root_cert_vec),
128    ///         }
129    ///     )
130    ///     .expect("Unable to build client");
131    ///
132    ///     let connection_info = client.get_connection_info();
133    ///
134    ///     println!(">>> connection info: {connection_info:?}");
135    ///
136    ///     let mut con = client.get_multiplexed_async_connection().await?;
137    ///
138    ///     con.set("key1", b"foo").await?;
139    ///
140    ///     redis::cmd("SET")
141    ///         .arg(&["key2", "bar"])
142    ///         .exec_async(&mut con)
143    ///         .await?;
144    ///
145    ///     let result = redis::cmd("MGET")
146    ///         .arg(&["key1", "key2"])
147    ///         .query_async(&mut con)
148    ///         .await;
149    ///     assert_eq!(result, Ok(("foo".to_string(), b"bar".to_vec())));
150    ///     println!("Result from MGET: {result:?}");
151    ///
152    ///     Ok(())
153    /// }
154    /// ```
155    #[cfg(feature = "tls-rustls")]
156    pub fn build_with_tls<C: IntoConnectionInfo>(
157        conn_info: C,
158        tls_certs: TlsCertificates,
159    ) -> RedisResult<Client> {
160        let connection_info = conn_info.into_connection_info()?;
161
162        inner_build_with_tls(connection_info, &tls_certs)
163    }
164}
165
166#[cfg(feature = "cache-aio")]
167#[derive(Clone)]
168pub(crate) enum Cache {
169    Config(CacheConfig),
170    #[cfg(feature = "connection-manager")]
171    Manager(CacheManager),
172}
173
174/// Options for creation of async connection
175#[cfg(feature = "aio")]
176#[derive(Clone, Default)]
177pub struct AsyncConnectionConfig {
178    /// Maximum time to wait for a response from the server
179    pub(crate) response_timeout: Option<std::time::Duration>,
180    /// Maximum time to wait for a connection to be established
181    pub(crate) connection_timeout: Option<std::time::Duration>,
182    pub(crate) push_sender: Option<std::sync::Arc<dyn AsyncPushSender>>,
183    #[cfg(feature = "cache-aio")]
184    pub(crate) cache: Option<Cache>,
185    pub(crate) tcp_settings: TcpSettings,
186    pub(crate) dns_resolver: Option<std::sync::Arc<dyn AsyncDNSResolver>>,
187}
188
189#[cfg(feature = "aio")]
190impl AsyncConnectionConfig {
191    /// Creates a new instance of the options with nothing set
192    pub fn new() -> Self {
193        Self::default()
194    }
195
196    /// Sets the connection timeout
197    pub fn set_connection_timeout(mut self, connection_timeout: std::time::Duration) -> Self {
198        self.connection_timeout = Some(connection_timeout);
199        self
200    }
201
202    /// Sets the response timeout
203    pub fn set_response_timeout(mut self, response_timeout: std::time::Duration) -> Self {
204        self.response_timeout = Some(response_timeout);
205        self
206    }
207
208    /// Sets sender sender for push values.
209    ///
210    /// The sender can be a channel, or an arbitrary function that handles [crate::PushInfo] values.
211    /// This will fail client creation if the connection isn't configured for RESP3 communications via the [crate::RedisConnectionInfo::protocol] field.
212    ///
213    /// # Examples
214    ///
215    /// ```rust
216    /// # use redis::AsyncConnectionConfig;
217    /// let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
218    /// let config = AsyncConnectionConfig::new().set_push_sender(tx);
219    /// ```
220    ///
221    /// ```rust
222    /// # use std::sync::{Mutex, Arc};
223    /// # use redis::AsyncConnectionConfig;
224    /// let messages = Arc::new(Mutex::new(Vec::new()));
225    /// let config = AsyncConnectionConfig::new().set_push_sender(move |msg|{
226    ///     let Ok(mut messages) = messages.lock() else {
227    ///         return Err(redis::aio::SendError);
228    ///     };
229    ///     messages.push(msg);
230    ///     Ok(())
231    /// });
232    /// ```
233    pub fn set_push_sender(self, sender: impl AsyncPushSender) -> Self {
234        self.set_push_sender_internal(std::sync::Arc::new(sender))
235    }
236
237    pub(crate) fn set_push_sender_internal(
238        mut self,
239        sender: std::sync::Arc<dyn AsyncPushSender>,
240    ) -> Self {
241        self.push_sender = Some(sender);
242        self
243    }
244
245    /// Sets cache config for MultiplexedConnection, check CacheConfig for more details.
246    #[cfg(feature = "cache-aio")]
247    pub fn set_cache_config(mut self, cache_config: CacheConfig) -> Self {
248        self.cache = Some(Cache::Config(cache_config));
249        self
250    }
251
252    #[cfg(all(feature = "cache-aio", feature = "connection-manager"))]
253    pub(crate) fn set_cache_manager(mut self, cache_manager: CacheManager) -> Self {
254        self.cache = Some(Cache::Manager(cache_manager));
255        self
256    }
257
258    /// Set the behavior of the underlying TCP connection.
259    pub fn set_tcp_settings(self, tcp_settings: crate::io::tcp::TcpSettings) -> Self {
260        Self {
261            tcp_settings,
262            ..self
263        }
264    }
265
266    /// Set the DNS resolver for the underlying TCP connection.
267    ///
268    /// The parameter resolver must implement the [`crate::aio::DNSResolver`] trait.
269    pub fn set_dns_resolver(self, dns_resolver: impl AsyncDNSResolver) -> Self {
270        self.set_dns_resolver_internal(std::sync::Arc::new(dns_resolver))
271    }
272
273    pub(super) fn set_dns_resolver_internal(
274        mut self,
275        dns_resolver: std::sync::Arc<dyn AsyncDNSResolver>,
276    ) -> Self {
277        self.dns_resolver = Some(dns_resolver);
278        self
279    }
280}
281
282/// To enable async support you need to chose one of the supported runtimes and active its
283/// corresponding feature: `tokio-comp` or `async-std-comp`
284#[cfg(feature = "aio")]
285#[cfg_attr(docsrs, doc(cfg(feature = "aio")))]
286impl Client {
287    /// Returns an async connection from the client.
288    #[cfg(any(feature = "tokio-comp", feature = "async-std-comp"))]
289    #[deprecated(
290        note = "aio::Connection is deprecated. Use client::get_multiplexed_async_connection instead."
291    )]
292    #[allow(deprecated)]
293    pub async fn get_async_connection(&self) -> RedisResult<crate::aio::Connection> {
294        let con = self
295            .get_simple_async_connection_dynamically(
296                &DefaultAsyncDNSResolver,
297                &TcpSettings::default(),
298            )
299            .await?;
300
301        crate::aio::Connection::new(&self.connection_info.redis, con).await
302    }
303
304    /// Returns an async connection from the client.
305    #[cfg(feature = "tokio-comp")]
306    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-comp")))]
307    #[deprecated(
308        note = "aio::Connection is deprecated. Use client::get_multiplexed_async_connection instead."
309    )]
310    #[allow(deprecated)]
311    pub async fn get_tokio_connection(&self) -> RedisResult<crate::aio::Connection> {
312        use crate::aio::RedisRuntime;
313        Ok(
314            crate::aio::connect::<crate::aio::tokio::Tokio>(&self.connection_info)
315                .await?
316                .map(RedisRuntime::boxed),
317        )
318    }
319
320    /// Returns an async connection from the client.
321    #[cfg(feature = "async-std-comp")]
322    #[cfg_attr(docsrs, doc(cfg(feature = "async-std-comp")))]
323    #[deprecated(
324        note = "aio::Connection is deprecated. Use client::get_multiplexed_async_std_connection instead."
325    )]
326    #[allow(deprecated)]
327    pub async fn get_async_std_connection(&self) -> RedisResult<crate::aio::Connection> {
328        use crate::aio::RedisRuntime;
329        Ok(
330            crate::aio::connect::<crate::aio::async_std::AsyncStd>(&self.connection_info)
331                .await?
332                .map(RedisRuntime::boxed),
333        )
334    }
335
336    /// Returns an async connection from the client.
337    #[cfg(any(feature = "tokio-comp", feature = "async-std-comp"))]
338    #[cfg_attr(
339        docsrs,
340        doc(cfg(any(feature = "tokio-comp", feature = "async-std-comp")))
341    )]
342    pub async fn get_multiplexed_async_connection(
343        &self,
344    ) -> RedisResult<crate::aio::MultiplexedConnection> {
345        self.get_multiplexed_async_connection_with_config(&AsyncConnectionConfig::new())
346            .await
347    }
348
349    /// Returns an async connection from the client.
350    #[cfg(any(feature = "tokio-comp", feature = "async-std-comp"))]
351    #[cfg_attr(
352        docsrs,
353        doc(cfg(any(feature = "tokio-comp", feature = "async-std-comp")))
354    )]
355    #[deprecated(note = "Use `get_multiplexed_async_connection_with_config` instead")]
356    pub async fn get_multiplexed_async_connection_with_timeouts(
357        &self,
358        response_timeout: std::time::Duration,
359        connection_timeout: std::time::Duration,
360    ) -> RedisResult<crate::aio::MultiplexedConnection> {
361        self.get_multiplexed_async_connection_with_config(
362            &AsyncConnectionConfig::new()
363                .set_connection_timeout(connection_timeout)
364                .set_response_timeout(response_timeout),
365        )
366        .await
367    }
368
369    /// Returns an async connection from the client.
370    #[cfg(any(feature = "tokio-comp", feature = "async-std-comp"))]
371    #[cfg_attr(
372        docsrs,
373        doc(cfg(any(feature = "tokio-comp", feature = "async-std-comp")))
374    )]
375    pub async fn get_multiplexed_async_connection_with_config(
376        &self,
377        config: &AsyncConnectionConfig,
378    ) -> RedisResult<crate::aio::MultiplexedConnection> {
379        let result = match Runtime::locate() {
380            #[cfg(feature = "tokio-comp")]
381            rt @ Runtime::Tokio => {
382                if let Some(connection_timeout) = config.connection_timeout {
383                    rt.timeout(
384                        connection_timeout,
385                        self.get_multiplexed_async_connection_inner::<crate::aio::tokio::Tokio>(
386                            config,
387                        ),
388                    )
389                    .await
390                } else {
391                    Ok(self
392                        .get_multiplexed_async_connection_inner::<crate::aio::tokio::Tokio>(config)
393                        .await)
394                }
395            }
396            #[cfg(feature = "async-std-comp")]
397            rt @ Runtime::AsyncStd => {
398                if let Some(connection_timeout) = config.connection_timeout {
399                    rt.timeout(
400                        connection_timeout,
401                        self.get_multiplexed_async_connection_inner::<crate::aio::async_std::AsyncStd>(
402                            config,
403                        ),
404                    )
405                    .await
406                } else {
407                    Ok(self
408                        .get_multiplexed_async_connection_inner::<crate::aio::async_std::AsyncStd>(
409                            config,
410                        )
411                        .await)
412                }
413            }
414        };
415
416        match result {
417            Ok(Ok(connection)) => Ok(connection),
418            Ok(Err(e)) => Err(e),
419            Err(elapsed) => Err(elapsed.into()),
420        }
421    }
422
423    /// Returns an async multiplexed connection from the client.
424    ///
425    /// A multiplexed connection can be cloned, allowing requests to be sent concurrently
426    /// on the same underlying connection (tcp/unix socket).
427    #[cfg(feature = "tokio-comp")]
428    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-comp")))]
429    pub async fn get_multiplexed_tokio_connection_with_response_timeouts(
430        &self,
431        response_timeout: std::time::Duration,
432        connection_timeout: std::time::Duration,
433    ) -> RedisResult<crate::aio::MultiplexedConnection> {
434        let result = Runtime::locate()
435            .timeout(
436                connection_timeout,
437                self.get_multiplexed_async_connection_inner::<crate::aio::tokio::Tokio>(
438                    &AsyncConnectionConfig::new().set_response_timeout(response_timeout),
439                ),
440            )
441            .await;
442
443        match result {
444            Ok(Ok(connection)) => Ok(connection),
445            Ok(Err(e)) => Err(e),
446            Err(elapsed) => Err(elapsed.into()),
447        }
448    }
449
450    /// Returns an async multiplexed connection from the client.
451    ///
452    /// A multiplexed connection can be cloned, allowing requests to be sent concurrently
453    /// on the same underlying connection (tcp/unix socket).
454    #[cfg(feature = "tokio-comp")]
455    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-comp")))]
456    pub async fn get_multiplexed_tokio_connection(
457        &self,
458    ) -> RedisResult<crate::aio::MultiplexedConnection> {
459        self.get_multiplexed_async_connection_inner::<crate::aio::tokio::Tokio>(
460            &AsyncConnectionConfig::new(),
461        )
462        .await
463    }
464
465    /// Returns an async multiplexed connection from the client.
466    ///
467    /// A multiplexed connection can be cloned, allowing requests to be sent concurrently
468    /// on the same underlying connection (tcp/unix socket).
469    #[cfg(feature = "async-std-comp")]
470    #[cfg_attr(docsrs, doc(cfg(feature = "async-std-comp")))]
471    pub async fn get_multiplexed_async_std_connection_with_timeouts(
472        &self,
473        response_timeout: std::time::Duration,
474        connection_timeout: std::time::Duration,
475    ) -> RedisResult<crate::aio::MultiplexedConnection> {
476        let result = Runtime::locate()
477            .timeout(
478                connection_timeout,
479                self.get_multiplexed_async_connection_inner::<crate::aio::async_std::AsyncStd>(
480                    &AsyncConnectionConfig::new().set_response_timeout(response_timeout),
481                ),
482            )
483            .await;
484
485        match result {
486            Ok(Ok(connection)) => Ok(connection),
487            Ok(Err(e)) => Err(e),
488            Err(elapsed) => Err(elapsed.into()),
489        }
490    }
491
492    /// Returns an async multiplexed connection from the client.
493    ///
494    /// A multiplexed connection can be cloned, allowing requests to be sent concurrently
495    /// on the same underlying connection (tcp/unix socket).
496    #[cfg(feature = "async-std-comp")]
497    #[cfg_attr(docsrs, doc(cfg(feature = "async-std-comp")))]
498    pub async fn get_multiplexed_async_std_connection(
499        &self,
500    ) -> RedisResult<crate::aio::MultiplexedConnection> {
501        self.get_multiplexed_async_connection_inner::<crate::aio::async_std::AsyncStd>(
502            &AsyncConnectionConfig::new(),
503        )
504        .await
505    }
506
507    /// Returns an async multiplexed connection from the client and a future which must be polled
508    /// to drive any requests submitted to it (see [Self::get_multiplexed_async_connection]).
509    ///
510    /// A multiplexed connection can be cloned, allowing requests to be sent concurrently
511    /// on the same underlying connection (tcp/unix socket).
512    /// The multiplexer will return a timeout error on any request that takes longer then `response_timeout`.
513    #[cfg(feature = "tokio-comp")]
514    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-comp")))]
515    pub async fn create_multiplexed_tokio_connection_with_response_timeout(
516        &self,
517        response_timeout: std::time::Duration,
518    ) -> RedisResult<(
519        crate::aio::MultiplexedConnection,
520        impl std::future::Future<Output = ()>,
521    )> {
522        self.create_multiplexed_async_connection_inner::<crate::aio::tokio::Tokio>(
523            &AsyncConnectionConfig::new().set_response_timeout(response_timeout),
524        )
525        .await
526    }
527
528    /// Returns an async multiplexed connection from the client and a future which must be polled
529    /// to drive any requests submitted to it (see [Self::get_multiplexed_async_connection]).
530    ///
531    /// A multiplexed connection can be cloned, allowing requests to be sent concurrently
532    /// on the same underlying connection (tcp/unix socket).
533    #[cfg(feature = "tokio-comp")]
534    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-comp")))]
535    pub async fn create_multiplexed_tokio_connection(
536        &self,
537    ) -> RedisResult<(
538        crate::aio::MultiplexedConnection,
539        impl std::future::Future<Output = ()>,
540    )> {
541        self.create_multiplexed_async_connection_inner::<crate::aio::tokio::Tokio>(
542            &AsyncConnectionConfig::new(),
543        )
544        .await
545    }
546
547    /// Returns an async multiplexed connection from the client and a future which must be polled
548    /// to drive any requests submitted to it (see [Self::get_multiplexed_async_connection]).
549    ///
550    /// A multiplexed connection can be cloned, allowing requests to be sent concurrently
551    /// on the same underlying connection (tcp/unix socket).
552    /// The multiplexer will return a timeout error on any request that takes longer then `response_timeout`.
553    #[cfg(feature = "async-std-comp")]
554    #[cfg_attr(docsrs, doc(cfg(feature = "async-std-comp")))]
555    pub async fn create_multiplexed_async_std_connection_with_response_timeout(
556        &self,
557        response_timeout: std::time::Duration,
558    ) -> RedisResult<(
559        crate::aio::MultiplexedConnection,
560        impl std::future::Future<Output = ()>,
561    )> {
562        self.create_multiplexed_async_connection_inner::<crate::aio::async_std::AsyncStd>(
563            &AsyncConnectionConfig::new().set_response_timeout(response_timeout),
564        )
565        .await
566    }
567
568    /// Returns an async multiplexed connection from the client and a future which must be polled
569    /// to drive any requests submitted to it (see [Self::get_multiplexed_async_connection]).
570    ///
571    /// A multiplexed connection can be cloned, allowing requests to be sent concurrently
572    /// on the same underlying connection (tcp/unix socket).
573    #[cfg(feature = "async-std-comp")]
574    #[cfg_attr(docsrs, doc(cfg(feature = "async-std-comp")))]
575    pub async fn create_multiplexed_async_std_connection(
576        &self,
577    ) -> RedisResult<(
578        crate::aio::MultiplexedConnection,
579        impl std::future::Future<Output = ()>,
580    )> {
581        self.create_multiplexed_async_connection_inner::<crate::aio::async_std::AsyncStd>(
582            &AsyncConnectionConfig::new(),
583        )
584        .await
585    }
586
587    /// Returns an async [`ConnectionManager`][connection-manager] from the client.
588    ///
589    /// The connection manager wraps a
590    /// [`MultiplexedConnection`][multiplexed-connection]. If a command to that
591    /// connection fails with a connection error, then a new connection is
592    /// established in the background and the error is returned to the caller.
593    ///
594    /// This means that on connection loss at least one command will fail, but
595    /// the connection will be re-established automatically if possible. Please
596    /// refer to the [`ConnectionManager`][connection-manager] docs for
597    /// detailed reconnecting behavior.
598    ///
599    /// A connection manager can be cloned, allowing requests to be sent concurrently
600    /// on the same underlying connection (tcp/unix socket).
601    ///
602    /// [connection-manager]: aio/struct.ConnectionManager.html
603    /// [multiplexed-connection]: aio/struct.MultiplexedConnection.html
604    #[cfg(feature = "connection-manager")]
605    #[cfg_attr(docsrs, doc(cfg(feature = "connection-manager")))]
606    #[deprecated(note = "use get_connection_manager instead")]
607    pub async fn get_tokio_connection_manager(&self) -> RedisResult<crate::aio::ConnectionManager> {
608        crate::aio::ConnectionManager::new(self.clone()).await
609    }
610
611    /// Returns an async [`ConnectionManager`][connection-manager] from the client.
612    ///
613    /// The connection manager wraps a
614    /// [`MultiplexedConnection`][multiplexed-connection]. If a command to that
615    /// connection fails with a connection error, then a new connection is
616    /// established in the background and the error is returned to the caller.
617    ///
618    /// This means that on connection loss at least one command will fail, but
619    /// the connection will be re-established automatically if possible. Please
620    /// refer to the [`ConnectionManager`][connection-manager] docs for
621    /// detailed reconnecting behavior.
622    ///
623    /// A connection manager can be cloned, allowing requests to be sent concurrently
624    /// on the same underlying connection (tcp/unix socket).
625    ///
626    /// [connection-manager]: aio/struct.ConnectionManager.html
627    /// [multiplexed-connection]: aio/struct.MultiplexedConnection.html
628    #[cfg(feature = "connection-manager")]
629    #[cfg_attr(docsrs, doc(cfg(feature = "connection-manager")))]
630    pub async fn get_connection_manager(&self) -> RedisResult<crate::aio::ConnectionManager> {
631        crate::aio::ConnectionManager::new(self.clone()).await
632    }
633
634    /// Returns an async [`ConnectionManager`][connection-manager] from the client.
635    ///
636    /// The connection manager wraps a
637    /// [`MultiplexedConnection`][multiplexed-connection]. If a command to that
638    /// connection fails with a connection error, then a new connection is
639    /// established in the background and the error is returned to the caller.
640    ///
641    /// This means that on connection loss at least one command will fail, but
642    /// the connection will be re-established automatically if possible. Please
643    /// refer to the [`ConnectionManager`][connection-manager] docs for
644    /// detailed reconnecting behavior.
645    ///
646    /// A connection manager can be cloned, allowing requests to be sent concurrently
647    /// on the same underlying connection (tcp/unix socket).
648    ///
649    /// [connection-manager]: aio/struct.ConnectionManager.html
650    /// [multiplexed-connection]: aio/struct.MultiplexedConnection.html
651    #[cfg(feature = "connection-manager")]
652    #[cfg_attr(docsrs, doc(cfg(feature = "connection-manager")))]
653    #[deprecated(note = "Use `get_connection_manager_with_config` instead")]
654    pub async fn get_tokio_connection_manager_with_backoff(
655        &self,
656        exponent_base: u64,
657        factor: u64,
658        number_of_retries: usize,
659    ) -> RedisResult<crate::aio::ConnectionManager> {
660        use crate::aio::ConnectionManagerConfig;
661
662        let config = ConnectionManagerConfig::new()
663            .set_exponent_base(exponent_base)
664            .set_factor(factor)
665            .set_number_of_retries(number_of_retries);
666        crate::aio::ConnectionManager::new_with_config(self.clone(), config).await
667    }
668
669    /// Returns an async [`ConnectionManager`][connection-manager] from the client.
670    ///
671    /// The connection manager wraps a
672    /// [`MultiplexedConnection`][multiplexed-connection]. If a command to that
673    /// connection fails with a connection error, then a new connection is
674    /// established in the background and the error is returned to the caller.
675    ///
676    /// This means that on connection loss at least one command will fail, but
677    /// the connection will be re-established automatically if possible. Please
678    /// refer to the [`ConnectionManager`][connection-manager] docs for
679    /// detailed reconnecting behavior.
680    ///
681    /// A connection manager can be cloned, allowing requests to be sent concurrently
682    /// on the same underlying connection (tcp/unix socket).
683    ///
684    /// [connection-manager]: aio/struct.ConnectionManager.html
685    /// [multiplexed-connection]: aio/struct.MultiplexedConnection.html
686    #[cfg(feature = "connection-manager")]
687    #[cfg_attr(docsrs, doc(cfg(feature = "connection-manager")))]
688    #[deprecated(note = "Use `get_connection_manager_with_config` instead")]
689    pub async fn get_tokio_connection_manager_with_backoff_and_timeouts(
690        &self,
691        exponent_base: u64,
692        factor: u64,
693        number_of_retries: usize,
694        response_timeout: std::time::Duration,
695        connection_timeout: std::time::Duration,
696    ) -> RedisResult<crate::aio::ConnectionManager> {
697        use crate::aio::ConnectionManagerConfig;
698
699        let config = ConnectionManagerConfig::new()
700            .set_exponent_base(exponent_base)
701            .set_factor(factor)
702            .set_response_timeout(response_timeout)
703            .set_connection_timeout(connection_timeout)
704            .set_number_of_retries(number_of_retries);
705        crate::aio::ConnectionManager::new_with_config(self.clone(), config).await
706    }
707
708    /// Returns an async [`ConnectionManager`][connection-manager] from the client.
709    ///
710    /// The connection manager wraps a
711    /// [`MultiplexedConnection`][multiplexed-connection]. If a command to that
712    /// connection fails with a connection error, then a new connection is
713    /// established in the background and the error is returned to the caller.
714    ///
715    /// This means that on connection loss at least one command will fail, but
716    /// the connection will be re-established automatically if possible. Please
717    /// refer to the [`ConnectionManager`][connection-manager] docs for
718    /// detailed reconnecting behavior.
719    ///
720    /// A connection manager can be cloned, allowing requests to be sent concurrently
721    /// on the same underlying connection (tcp/unix socket).
722    ///
723    /// [connection-manager]: aio/struct.ConnectionManager.html
724    /// [multiplexed-connection]: aio/struct.MultiplexedConnection.html
725    #[cfg(feature = "connection-manager")]
726    #[cfg_attr(docsrs, doc(cfg(feature = "connection-manager")))]
727    #[deprecated(note = "Use `get_connection_manager_with_config` instead")]
728    pub async fn get_connection_manager_with_backoff_and_timeouts(
729        &self,
730        exponent_base: u64,
731        factor: u64,
732        number_of_retries: usize,
733        response_timeout: std::time::Duration,
734        connection_timeout: std::time::Duration,
735    ) -> RedisResult<crate::aio::ConnectionManager> {
736        use crate::aio::ConnectionManagerConfig;
737
738        let config = ConnectionManagerConfig::new()
739            .set_exponent_base(exponent_base)
740            .set_factor(factor)
741            .set_response_timeout(response_timeout)
742            .set_connection_timeout(connection_timeout)
743            .set_number_of_retries(number_of_retries);
744        crate::aio::ConnectionManager::new_with_config(self.clone(), config).await
745    }
746
747    /// Returns an async [`ConnectionManager`][connection-manager] from the client.
748    ///
749    /// The connection manager wraps a
750    /// [`MultiplexedConnection`][multiplexed-connection]. If a command to that
751    /// connection fails with a connection error, then a new connection is
752    /// established in the background and the error is returned to the caller.
753    ///
754    /// This means that on connection loss at least one command will fail, but
755    /// the connection will be re-established automatically if possible. Please
756    /// refer to the [`ConnectionManager`][connection-manager] docs for
757    /// detailed reconnecting behavior.
758    ///
759    /// A connection manager can be cloned, allowing requests to be sent concurrently
760    /// on the same underlying connection (tcp/unix socket).
761    ///
762    /// [connection-manager]: aio/struct.ConnectionManager.html
763    /// [multiplexed-connection]: aio/struct.MultiplexedConnection.html
764    #[cfg(feature = "connection-manager")]
765    #[cfg_attr(docsrs, doc(cfg(feature = "connection-manager")))]
766    pub async fn get_connection_manager_with_config(
767        &self,
768        config: crate::aio::ConnectionManagerConfig,
769    ) -> RedisResult<crate::aio::ConnectionManager> {
770        crate::aio::ConnectionManager::new_with_config(self.clone(), config).await
771    }
772
773    /// Returns an async [`ConnectionManager`][connection-manager] from the client.
774    ///
775    /// The connection manager wraps a
776    /// [`MultiplexedConnection`][multiplexed-connection]. If a command to that
777    /// connection fails with a connection error, then a new connection is
778    /// established in the background and the error is returned to the caller.
779    ///
780    /// This means that on connection loss at least one command will fail, but
781    /// the connection will be re-established automatically if possible. Please
782    /// refer to the [`ConnectionManager`][connection-manager] docs for
783    /// detailed reconnecting behavior.
784    ///
785    /// A connection manager can be cloned, allowing requests to be be sent concurrently
786    /// on the same underlying connection (tcp/unix socket).
787    ///
788    /// [connection-manager]: aio/struct.ConnectionManager.html
789    /// [multiplexed-connection]: aio/struct.MultiplexedConnection.html
790    #[cfg(feature = "connection-manager")]
791    #[cfg_attr(docsrs, doc(cfg(feature = "connection-manager")))]
792    #[deprecated(note = "Use `get_connection_manager_with_config` instead")]
793    pub async fn get_connection_manager_with_backoff(
794        &self,
795        exponent_base: u64,
796        factor: u64,
797        number_of_retries: usize,
798    ) -> RedisResult<crate::aio::ConnectionManager> {
799        use crate::aio::ConnectionManagerConfig;
800
801        let config = ConnectionManagerConfig::new()
802            .set_exponent_base(exponent_base)
803            .set_factor(factor)
804            .set_number_of_retries(number_of_retries);
805        crate::aio::ConnectionManager::new_with_config(self.clone(), config).await
806    }
807
808    async fn get_multiplexed_async_connection_inner<T>(
809        &self,
810        config: &AsyncConnectionConfig,
811    ) -> RedisResult<crate::aio::MultiplexedConnection>
812    where
813        T: crate::aio::RedisRuntime,
814    {
815        let (mut connection, driver) = self
816            .create_multiplexed_async_connection_inner::<T>(config)
817            .await?;
818        let handle = T::spawn(driver);
819        connection.set_task_handle(handle);
820        Ok(connection)
821    }
822
823    async fn create_multiplexed_async_connection_inner<T>(
824        &self,
825        config: &AsyncConnectionConfig,
826    ) -> RedisResult<(
827        crate::aio::MultiplexedConnection,
828        impl std::future::Future<Output = ()>,
829    )>
830    where
831        T: crate::aio::RedisRuntime,
832    {
833        let resolver = config
834            .dns_resolver
835            .as_deref()
836            .unwrap_or(&DefaultAsyncDNSResolver);
837        let con = self
838            .get_simple_async_connection::<T>(resolver, &config.tcp_settings)
839            .await?;
840        crate::aio::MultiplexedConnection::new_with_config(
841            &self.connection_info.redis,
842            con,
843            config.clone(),
844        )
845        .await
846    }
847
848    async fn get_simple_async_connection_dynamically(
849        &self,
850        dns_resolver: &dyn AsyncDNSResolver,
851        tcp_settings: &TcpSettings,
852    ) -> RedisResult<Pin<Box<dyn crate::aio::AsyncStream + Send + Sync>>> {
853        match Runtime::locate() {
854            #[cfg(feature = "tokio-comp")]
855            Runtime::Tokio => {
856                self.get_simple_async_connection::<crate::aio::tokio::Tokio>(
857                    dns_resolver,
858                    tcp_settings,
859                )
860                .await
861            }
862
863            #[cfg(feature = "async-std-comp")]
864            Runtime::AsyncStd => {
865                self.get_simple_async_connection::<crate::aio::async_std::AsyncStd>(
866                    dns_resolver,
867                    tcp_settings,
868                )
869                .await
870            }
871        }
872    }
873
874    async fn get_simple_async_connection<T>(
875        &self,
876        dns_resolver: &dyn AsyncDNSResolver,
877        tcp_settings: &TcpSettings,
878    ) -> RedisResult<Pin<Box<dyn crate::aio::AsyncStream + Send + Sync>>>
879    where
880        T: crate::aio::RedisRuntime,
881    {
882        Ok(
883            crate::aio::connect_simple::<T>(&self.connection_info, dns_resolver, tcp_settings)
884                .await?
885                .boxed(),
886        )
887    }
888
889    #[cfg(feature = "connection-manager")]
890    pub(crate) fn connection_info(&self) -> &ConnectionInfo {
891        &self.connection_info
892    }
893
894    /// Returns an async receiver for pub-sub messages.
895    #[cfg(any(feature = "tokio-comp", feature = "async-std-comp"))]
896    // TODO - do we want to type-erase pubsub using a trait, to allow us to replace it with a different implementation later?
897    pub async fn get_async_pubsub(&self) -> RedisResult<crate::aio::PubSub> {
898        let connection = self
899            .get_simple_async_connection_dynamically(
900                &DefaultAsyncDNSResolver,
901                &TcpSettings::default(),
902            )
903            .await?;
904
905        crate::aio::PubSub::new(&self.connection_info.redis, connection).await
906    }
907
908    /// Returns an async receiver for monitor messages.
909    #[cfg(any(feature = "tokio-comp", feature = "async-std-comp"))]
910    // TODO - do we want to type-erase monitor using a trait, to allow us to replace it with a different implementation later?
911    pub async fn get_async_monitor(&self) -> RedisResult<crate::aio::Monitor> {
912        #[allow(deprecated)]
913        self.get_async_connection()
914            .await
915            .map(|connection| connection.into_monitor())
916    }
917}
918
919#[cfg(feature = "aio")]
920use crate::aio::Runtime;
921
922impl ConnectionLike for Client {
923    fn req_packed_command(&mut self, cmd: &[u8]) -> RedisResult<Value> {
924        self.get_connection()?.req_packed_command(cmd)
925    }
926
927    fn req_packed_commands(
928        &mut self,
929        cmd: &[u8],
930        offset: usize,
931        count: usize,
932    ) -> RedisResult<Vec<Value>> {
933        self.get_connection()?
934            .req_packed_commands(cmd, offset, count)
935    }
936
937    fn get_db(&self) -> i64 {
938        self.connection_info.redis.db
939    }
940
941    fn check_connection(&mut self) -> bool {
942        if let Ok(mut conn) = self.get_connection() {
943            conn.check_connection()
944        } else {
945            false
946        }
947    }
948
949    fn is_open(&self) -> bool {
950        if let Ok(conn) = self.get_connection() {
951            conn.is_open()
952        } else {
953            false
954        }
955    }
956}
957
958#[cfg(test)]
959mod test {
960    use super::*;
961
962    #[test]
963    fn regression_293_parse_ipv6_with_interface() {
964        assert!(Client::open(("fe80::cafe:beef%eno1", 6379)).is_ok());
965    }
966}