Skip to main content

redis/
client.rs

1use std::time::Duration;
2
3#[cfg(feature = "aio")]
4use crate::aio::{AsyncPushSender, DefaultAsyncDNSResolver};
5#[cfg(feature = "token-based-authentication")]
6use crate::auth::StreamingCredentialsProvider;
7#[cfg(feature = "aio")]
8use crate::io::AsyncDNSResolver;
9use crate::{
10    connection::{Connection, ConnectionInfo, ConnectionLike, IntoConnectionInfo, connect},
11    types::{RedisResult, Value},
12};
13#[cfg(feature = "aio")]
14use std::pin::Pin;
15
16#[cfg(feature = "tls-rustls")]
17use crate::tls::{TlsCertificates, inner_build_with_tls};
18
19#[cfg(feature = "cache-aio")]
20use crate::caching::CacheConfig;
21#[cfg(all(
22    feature = "cache-aio",
23    any(feature = "connection-manager", feature = "cluster-async")
24))]
25use crate::caching::CacheManager;
26
27/// The client type.
28#[derive(Debug, Clone)]
29pub struct Client {
30    pub(crate) connection_info: ConnectionInfo,
31}
32
33/// The client acts as connector to the redis server.  By itself it does not
34/// do much other than providing a convenient way to fetch a connection from
35/// it.  In the future the plan is to provide a connection pool in the client.
36///
37/// When opening a client a URL in the following format should be used:
38///
39/// ```plain
40/// redis://host:port/db
41/// ```
42///
43/// Example usage::
44///
45/// ```rust,no_run
46/// let client = redis::Client::open("redis://127.0.0.1/").unwrap();
47/// let con = client.get_connection().unwrap();
48/// ```
49impl Client {
50    /// Connects to a redis server and returns a client.  This does not
51    /// actually open a connection yet but it does perform some basic
52    /// checks on the URL that might make the operation fail.
53    pub fn open<T: IntoConnectionInfo>(params: T) -> RedisResult<Client> {
54        Ok(Client {
55            connection_info: params.into_connection_info()?,
56        })
57    }
58
59    /// Instructs the client to actually connect to redis and returns a
60    /// connection object.  The connection object can be used to send
61    /// commands to the server.  This can fail with a variety of errors
62    /// (like unreachable host) so it's important that you handle those
63    /// errors.
64    pub fn get_connection(&self) -> RedisResult<Connection> {
65        connect(&self.connection_info, None)
66    }
67
68    /// Instructs the client to actually connect to redis with specified
69    /// timeout and returns a connection object.  The connection object
70    /// can be used to send commands to the server.  This can fail with
71    /// a variety of errors (like unreachable host) so it's important
72    /// that you handle those errors.
73    pub fn get_connection_with_timeout(&self, timeout: Duration) -> RedisResult<Connection> {
74        connect(&self.connection_info, Some(timeout))
75    }
76
77    /// Returns a reference of client connection info object.
78    pub fn get_connection_info(&self) -> &ConnectionInfo {
79        &self.connection_info
80    }
81
82    /// Constructs a new `Client` with parameters necessary to create a TLS connection.
83    ///
84    /// - `conn_info` - URL using the `rediss://` scheme.
85    /// - `tls_certs` - `TlsCertificates` structure containing:
86    ///     - `client_tls` - Optional `ClientTlsConfig` containing byte streams for
87    ///         - `client_cert` - client's byte stream containing client certificate in PEM format
88    ///         - `client_key` - client's byte stream containing private key in PEM format
89    ///     - `root_cert` - Optional byte stream yielding PEM formatted file for root certificates.
90    ///
91    /// If `ClientTlsConfig` ( cert+key pair ) is not provided, then client-side authentication is not enabled.
92    /// If `root_cert` is not provided, then system root certificates are used instead.
93    ///
94    /// # Examples
95    ///
96    /// ```no_run
97    /// use std::{fs::File, io::{BufReader, Read}};
98    ///
99    /// use redis::{Client, AsyncTypedCommands as _, TlsCertificates, ClientTlsConfig};
100    ///
101    /// async fn do_redis_code(
102    ///     url: &str,
103    ///     root_cert_file: &str,
104    ///     cert_file: &str,
105    ///     key_file: &str
106    /// ) -> redis::RedisResult<()> {
107    ///     let root_cert_file = File::open(root_cert_file).expect("cannot open private cert file");
108    ///     let mut root_cert_vec = Vec::new();
109    ///     BufReader::new(root_cert_file)
110    ///         .read_to_end(&mut root_cert_vec)
111    ///         .expect("Unable to read ROOT cert file");
112    ///
113    ///     let cert_file = File::open(cert_file).expect("cannot open private cert file");
114    ///     let mut client_cert_vec = Vec::new();
115    ///     BufReader::new(cert_file)
116    ///         .read_to_end(&mut client_cert_vec)
117    ///         .expect("Unable to read client cert file");
118    ///
119    ///     let key_file = File::open(key_file).expect("cannot open private key file");
120    ///     let mut client_key_vec = Vec::new();
121    ///     BufReader::new(key_file)
122    ///         .read_to_end(&mut client_key_vec)
123    ///         .expect("Unable to read client key file");
124    ///
125    ///     let client = Client::build_with_tls(
126    ///         url,
127    ///         TlsCertificates {
128    ///             client_tls: Some(ClientTlsConfig{
129    ///                 client_cert: client_cert_vec,
130    ///                 client_key: client_key_vec,
131    ///             }),
132    ///             root_cert: Some(root_cert_vec),
133    ///         }
134    ///     )
135    ///     .expect("Unable to build client");
136    ///
137    ///     let connection_info = client.get_connection_info();
138    ///
139    ///     println!(">>> connection info: {connection_info:?}");
140    ///
141    ///     let mut con = client.get_multiplexed_async_connection().await?;
142    ///
143    ///     con.set("key1", b"foo").await?;
144    ///
145    ///     redis::cmd("SET")
146    ///         .arg(&["key2", "bar"])
147    ///         .exec_async(&mut con)
148    ///         .await?;
149    ///
150    ///     let result = redis::cmd("MGET")
151    ///         .arg(&["key1", "key2"])
152    ///         .query_async(&mut con)
153    ///         .await;
154    ///     assert_eq!(result, Ok(("foo".to_string(), b"bar".to_vec())));
155    ///     println!("Result from MGET: {result:?}");
156    ///
157    ///     Ok(())
158    /// }
159    /// ```
160    #[cfg(feature = "tls-rustls")]
161    pub fn build_with_tls<C: IntoConnectionInfo>(
162        conn_info: C,
163        tls_certs: TlsCertificates,
164    ) -> RedisResult<Client> {
165        let connection_info = conn_info.into_connection_info()?;
166
167        inner_build_with_tls(connection_info, &tls_certs)
168    }
169}
170
171#[cfg(feature = "cache-aio")]
172#[derive(Clone)]
173pub(crate) enum Cache {
174    Config(CacheConfig),
175    #[cfg(any(feature = "connection-manager", feature = "cluster-async"))]
176    Manager(CacheManager),
177}
178
179#[cfg(feature = "aio")]
180pub(crate) const DEFAULT_RESPONSE_TIMEOUT: Option<Duration> = Some(Duration::from_millis(500));
181#[cfg(any(feature = "aio", feature = "cluster"))]
182pub(crate) const DEFAULT_CONNECTION_TIMEOUT: Option<Duration> = Some(Duration::from_secs(1));
183
184/// Options for creation of async connection
185#[cfg(feature = "aio")]
186#[derive(Clone)]
187pub struct AsyncConnectionConfig {
188    /// Maximum time to wait for a response from the server
189    pub(crate) response_timeout: Option<Duration>,
190    /// Maximum time to wait for a connection to be established
191    pub(crate) connection_timeout: Option<Duration>,
192    pub(crate) push_sender: Option<std::sync::Arc<dyn AsyncPushSender>>,
193    #[cfg(feature = "cache-aio")]
194    pub(crate) cache: Option<Cache>,
195    pub(crate) dns_resolver: Option<std::sync::Arc<dyn AsyncDNSResolver>>,
196    pub(crate) pipeline_buffer_size: Option<usize>,
197    pub(crate) concurrency_limit: Option<usize>,
198    /// Optional credentials provider for dynamic authentication (e.g., token-based authentication)
199    #[cfg(feature = "token-based-authentication")]
200    pub(crate) credentials_provider: Option<std::sync::Arc<dyn StreamingCredentialsProvider>>,
201}
202
203#[cfg(feature = "aio")]
204impl Default for AsyncConnectionConfig {
205    fn default() -> Self {
206        Self {
207            response_timeout: DEFAULT_RESPONSE_TIMEOUT,
208            connection_timeout: DEFAULT_CONNECTION_TIMEOUT,
209            push_sender: Default::default(),
210            #[cfg(feature = "cache-aio")]
211            cache: Default::default(),
212            dns_resolver: Default::default(),
213            pipeline_buffer_size: None,
214            concurrency_limit: None,
215            #[cfg(feature = "token-based-authentication")]
216            credentials_provider: None,
217        }
218    }
219}
220
221#[cfg(feature = "aio")]
222impl AsyncConnectionConfig {
223    /// Creates a new instance of the config with all parameters set to default values.
224    pub fn new() -> Self {
225        Self::default()
226    }
227
228    /// Each connection attempt to the server will time out after `connection_timeout`.
229    ///
230    /// Set `None` if you don't want the connection attempt to time out.
231    pub fn set_connection_timeout(mut self, connection_timeout: Option<Duration>) -> Self {
232        self.connection_timeout = connection_timeout;
233        self
234    }
235
236    /// The new connection will time out operations after `response_timeout` has passed.
237    ///
238    /// Set `None` if you don't want requests to time out.
239    pub fn set_response_timeout(mut self, response_timeout: Option<Duration>) -> Self {
240        self.response_timeout = response_timeout;
241        self
242    }
243
244    /// Sets sender sender for push values.
245    ///
246    /// The sender can be a channel, or an arbitrary function that handles [crate::PushInfo] values.
247    /// This will fail client creation if the connection isn't configured for RESP3 communications via the [crate::RedisConnectionInfo::set_protocol] function.
248    ///
249    /// # Examples
250    ///
251    /// ```rust
252    /// # use redis::AsyncConnectionConfig;
253    /// let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
254    /// let config = AsyncConnectionConfig::new().set_push_sender(tx);
255    /// ```
256    ///
257    /// ```rust
258    /// # use std::sync::{Mutex, Arc};
259    /// # use redis::AsyncConnectionConfig;
260    /// let messages = Arc::new(Mutex::new(Vec::new()));
261    /// let config = AsyncConnectionConfig::new().set_push_sender(move |msg|{
262    ///     let Ok(mut messages) = messages.lock() else {
263    ///         return Err(redis::aio::SendError);
264    ///     };
265    ///     messages.push(msg);
266    ///     Ok(())
267    /// });
268    /// ```
269    pub fn set_push_sender(self, sender: impl AsyncPushSender) -> Self {
270        self.set_push_sender_internal(std::sync::Arc::new(sender))
271    }
272
273    pub(crate) fn set_push_sender_internal(
274        mut self,
275        sender: std::sync::Arc<dyn AsyncPushSender>,
276    ) -> Self {
277        self.push_sender = Some(sender);
278        self
279    }
280
281    /// Sets cache config for MultiplexedConnection, check CacheConfig for more details.
282    #[cfg(feature = "cache-aio")]
283    pub fn set_cache_config(mut self, cache_config: CacheConfig) -> Self {
284        self.cache = Some(Cache::Config(cache_config));
285        self
286    }
287
288    #[cfg(all(
289        feature = "cache-aio",
290        any(feature = "connection-manager", feature = "cluster-async")
291    ))]
292    pub(crate) fn set_cache_manager(mut self, cache_manager: CacheManager) -> Self {
293        self.cache = Some(Cache::Manager(cache_manager));
294        self
295    }
296
297    /// Set the DNS resolver for the underlying TCP connection.
298    ///
299    /// The parameter resolver must implement the [`crate::io::AsyncDNSResolver`] trait.
300    pub fn set_dns_resolver(self, dns_resolver: impl AsyncDNSResolver) -> Self {
301        self.set_dns_resolver_internal(std::sync::Arc::new(dns_resolver))
302    }
303
304    pub(super) fn set_dns_resolver_internal(
305        mut self,
306        dns_resolver: std::sync::Arc<dyn AsyncDNSResolver>,
307    ) -> Self {
308        self.dns_resolver = Some(dns_resolver);
309        self
310    }
311
312    /// Sets the buffer size for the internal pipeline channel.
313    ///
314    /// The multiplexed connection uses an internal channel to queue Redis commands
315    /// before sending them to the server. This setting controls how many commands
316    /// can be buffered in that channel.
317    ///
318    /// When the buffer is full, callers will asynchronously wait until space becomes
319    /// available. A larger buffer allows more commands to be queued during bursts of
320    /// activity, reducing wait time for callers. However, this comes at the cost of
321    /// increased memory usage.
322    ///
323    /// The default value is 50. Consider increasing this value for high-concurrency
324    /// scenarios (e.g., web servers handling many simultaneous requests) where
325    /// buffer contention may increase overall latency and cause upstream timeouts.
326    pub fn set_pipeline_buffer_size(mut self, size: usize) -> Self {
327        self.pipeline_buffer_size = Some(size);
328        self
329    }
330
331    /// Sets the maximum number of concurrent in-flight requests on this connection.
332    ///
333    /// When set, at most `limit` requests can be awaiting a response at any given time.
334    /// Additional requests will wait until an in-flight request completes.
335    ///
336    /// Pipelined commands try to acquire one permit per command, but will proceed with
337    /// fewer if not all are immediately available. This means a pipeline may temporarily
338    /// push the effective in-flight count above the limit.
339    ///
340    /// This is useful for preventing a large backlog of commands from building up when the
341    /// server becomes slow or unresponsive. Without a limit, requests continue to queue
342    /// unboundedly. When the server is degraded, requests near the back of the queue spend
343    /// most of their time waiting behind earlier requests and are likely to hit their response
344    /// timeout before the server even processes them -- wasting work on both sides. Setting a
345    /// concurrency limit caps the number of in-flight requests, so backpressure is applied
346    /// earlier and fewer requests are lost to timeouts.
347    ///
348    /// By default there is no limit.
349    pub fn set_concurrency_limit(mut self, limit: usize) -> Self {
350        self.concurrency_limit = Some(limit);
351        self
352    }
353
354    /// Sets a credentials provider for dynamic authentication (e.g., token-based authentication).
355    ///
356    /// This is useful for authentication mechanisms that require periodic credential refresh,
357    /// such as Microsoft Entra ID (formerly Azure AD).
358    ///
359    /// # Example
360    ///
361    /// ```rust,no_run
362    /// # #[cfg(feature = "entra-id")]
363    /// # {
364    /// use redis::{AsyncConnectionConfig, EntraIdCredentialsProvider, RetryConfig};
365    ///
366    /// # async fn example() -> redis::RedisResult<()> {
367    /// let mut provider = EntraIdCredentialsProvider::new_developer_tools()?;
368    /// provider.start(RetryConfig::default());
369    ///
370    /// let config = AsyncConnectionConfig::new()
371    ///     .set_credentials_provider(provider);
372    /// # Ok(())
373    /// # }
374    /// # }
375    /// ```
376    #[cfg(feature = "token-based-authentication")]
377    pub fn set_credentials_provider<P>(self, provider: P) -> Self
378    where
379        P: StreamingCredentialsProvider + 'static,
380    {
381        self.set_credentials_provider_internal(std::sync::Arc::new(provider))
382    }
383
384    #[cfg(feature = "token-based-authentication")]
385    pub(crate) fn set_credentials_provider_internal(
386        mut self,
387        provider: std::sync::Arc<dyn StreamingCredentialsProvider>,
388    ) -> Self {
389        self.credentials_provider = Some(provider);
390        self
391    }
392}
393
394/// To enable async support you need to chose one of the supported runtimes and active its
395/// corresponding feature: `tokio-comp` or `smol-comp`
396#[cfg(feature = "aio")]
397#[cfg_attr(docsrs, doc(cfg(feature = "aio")))]
398impl Client {
399    /// Returns an async connection from the client.
400    #[cfg(feature = "aio")]
401    #[cfg_attr(docsrs, doc(cfg(feature = "aio")))]
402    pub async fn get_multiplexed_async_connection(
403        &self,
404    ) -> RedisResult<crate::aio::MultiplexedConnection> {
405        self.get_multiplexed_async_connection_with_config(&AsyncConnectionConfig::new())
406            .await
407    }
408
409    /// Returns an async connection from the client.
410    #[cfg(feature = "aio")]
411    #[cfg_attr(docsrs, doc(cfg(feature = "aio")))]
412    pub async fn get_multiplexed_async_connection_with_config(
413        &self,
414        config: &AsyncConnectionConfig,
415    ) -> RedisResult<crate::aio::MultiplexedConnection> {
416        match Runtime::locate() {
417            #[cfg(feature = "tokio-comp")]
418            rt @ Runtime::Tokio => self
419                .get_multiplexed_async_connection_inner_with_timeout::<crate::aio::tokio::Tokio>(
420                    config, rt,
421                )
422                .await,
423
424            #[cfg(feature = "smol-comp")]
425            rt @ Runtime::Smol => {
426                self.get_multiplexed_async_connection_inner_with_timeout::<crate::aio::smol::Smol>(
427                    config, rt,
428                )
429                .await
430            }
431        }
432    }
433
434    /// Returns an async [`ConnectionManager`][connection-manager] from the client.
435    ///
436    /// The connection manager wraps a
437    /// [`MultiplexedConnection`][multiplexed-connection]. If a command to that
438    /// connection fails with a connection error, then a new connection is
439    /// established in the background and the error is returned to the caller.
440    ///
441    /// This means that on connection loss at least one command will fail, but
442    /// the connection will be re-established automatically if possible. Please
443    /// refer to the [`ConnectionManager`][connection-manager] docs for
444    /// detailed reconnecting behavior.
445    ///
446    /// A connection manager can be cloned, allowing requests to be sent concurrently
447    /// on the same underlying connection (tcp/unix socket).
448    ///
449    /// [connection-manager]: aio/struct.ConnectionManager.html
450    /// [multiplexed-connection]: aio/struct.MultiplexedConnection.html
451    #[cfg(feature = "connection-manager")]
452    #[cfg_attr(docsrs, doc(cfg(feature = "connection-manager")))]
453    pub async fn get_connection_manager(&self) -> RedisResult<crate::aio::ConnectionManager> {
454        crate::aio::ConnectionManager::new(self.clone()).await
455    }
456
457    /// Returns an async [`ConnectionManager`][connection-manager] from the client without establishing a connection.
458    ///
459    /// The connection will be established lazily on the first request.
460    ///
461    /// [connection-manager]: aio/struct.ConnectionManager.html
462    #[cfg(feature = "connection-manager")]
463    #[cfg_attr(docsrs, doc(cfg(feature = "connection-manager")))]
464    pub fn get_connection_manager_lazy(
465        &self,
466        config: crate::aio::ConnectionManagerConfig,
467    ) -> RedisResult<crate::aio::ConnectionManager> {
468        crate::aio::ConnectionManager::new_lazy_with_config(self.clone(), config)
469    }
470
471    /// Returns an async [`ConnectionManager`][connection-manager] from the client.
472    ///
473    /// The connection manager wraps a
474    /// [`MultiplexedConnection`][multiplexed-connection]. If a command to that
475    /// connection fails with a connection error, then a new connection is
476    /// established in the background and the error is returned to the caller.
477    ///
478    /// This means that on connection loss at least one command will fail, but
479    /// the connection will be re-established automatically if possible. Please
480    /// refer to the [`ConnectionManager`][connection-manager] docs for
481    /// detailed reconnecting behavior.
482    ///
483    /// A connection manager can be cloned, allowing requests to be sent concurrently
484    /// on the same underlying connection (tcp/unix socket).
485    ///
486    /// [connection-manager]: aio/struct.ConnectionManager.html
487    /// [multiplexed-connection]: aio/struct.MultiplexedConnection.html
488    #[cfg(feature = "connection-manager")]
489    #[cfg_attr(docsrs, doc(cfg(feature = "connection-manager")))]
490    pub async fn get_connection_manager_with_config(
491        &self,
492        config: crate::aio::ConnectionManagerConfig,
493    ) -> RedisResult<crate::aio::ConnectionManager> {
494        crate::aio::ConnectionManager::new_with_config(self.clone(), config).await
495    }
496
497    async fn get_multiplexed_async_connection_inner_with_timeout<T>(
498        &self,
499        config: &AsyncConnectionConfig,
500        rt: Runtime,
501    ) -> RedisResult<crate::aio::MultiplexedConnection>
502    where
503        T: crate::aio::RedisRuntime,
504    {
505        let result = if let Some(connection_timeout) = config.connection_timeout {
506            rt.timeout(
507                connection_timeout,
508                self.get_multiplexed_async_connection_inner::<T>(config),
509            )
510            .await
511        } else {
512            Ok(self
513                .get_multiplexed_async_connection_inner::<T>(config)
514                .await)
515        };
516
517        match result {
518            Ok(Ok(connection)) => Ok(connection),
519            Ok(Err(e)) => Err(e),
520            Err(elapsed) => Err(elapsed.into()),
521        }
522    }
523
524    async fn get_multiplexed_async_connection_inner<T>(
525        &self,
526        config: &AsyncConnectionConfig,
527    ) -> RedisResult<crate::aio::MultiplexedConnection>
528    where
529        T: crate::aio::RedisRuntime,
530    {
531        let (mut connection, driver) = self
532            .create_multiplexed_async_connection_inner::<T>(config)
533            .await?;
534        let handle = T::spawn(driver);
535        connection.set_task_handle(handle);
536        Ok(connection)
537    }
538
539    async fn create_multiplexed_async_connection_inner<T>(
540        &self,
541        config: &AsyncConnectionConfig,
542    ) -> RedisResult<(
543        crate::aio::MultiplexedConnection,
544        impl std::future::Future<Output = ()> + 'static,
545    )>
546    where
547        T: crate::aio::RedisRuntime,
548    {
549        let resolver = config
550            .dns_resolver
551            .as_deref()
552            .unwrap_or(&DefaultAsyncDNSResolver);
553        let con = self.get_simple_async_connection::<T>(resolver).await?;
554        crate::aio::MultiplexedConnection::new_with_config(
555            &self.connection_info.redis,
556            con,
557            config.clone(),
558        )
559        .await
560    }
561
562    async fn get_simple_async_connection_dynamically(
563        &self,
564        dns_resolver: &dyn AsyncDNSResolver,
565    ) -> RedisResult<Pin<Box<dyn crate::aio::AsyncStream + Send + Sync>>> {
566        match Runtime::locate() {
567            #[cfg(feature = "tokio-comp")]
568            Runtime::Tokio => {
569                self.get_simple_async_connection::<crate::aio::tokio::Tokio>(dns_resolver)
570                    .await
571            }
572
573            #[cfg(feature = "smol-comp")]
574            Runtime::Smol => {
575                self.get_simple_async_connection::<crate::aio::smol::Smol>(dns_resolver)
576                    .await
577            }
578        }
579    }
580
581    async fn get_simple_async_connection<T>(
582        &self,
583        dns_resolver: &dyn AsyncDNSResolver,
584    ) -> RedisResult<Pin<Box<dyn crate::aio::AsyncStream + Send + Sync>>>
585    where
586        T: crate::aio::RedisRuntime,
587    {
588        Ok(
589            crate::aio::connect_simple::<T>(&self.connection_info, dns_resolver)
590                .await?
591                .boxed(),
592        )
593    }
594
595    #[cfg(feature = "connection-manager")]
596    pub(crate) fn connection_info(&self) -> &ConnectionInfo {
597        &self.connection_info
598    }
599
600    /// Returns an async receiver for pub-sub messages.
601    #[cfg(feature = "aio")]
602    // TODO - do we want to type-erase pubsub using a trait, to allow us to replace it with a different implementation later?
603    pub async fn get_async_pubsub(&self) -> RedisResult<crate::aio::PubSub> {
604        let connection = self
605            .get_simple_async_connection_dynamically(&DefaultAsyncDNSResolver)
606            .await?;
607
608        crate::aio::PubSub::new(&self.connection_info.redis, connection).await
609    }
610
611    /// Returns an async receiver for monitor messages.
612    #[cfg(feature = "aio")]
613    pub async fn get_async_monitor(&self) -> RedisResult<crate::aio::Monitor> {
614        let connection = self
615            .get_simple_async_connection_dynamically(&DefaultAsyncDNSResolver)
616            .await?;
617        crate::aio::Monitor::new(&self.connection_info.redis, connection).await
618    }
619}
620
621#[cfg(feature = "aio")]
622use crate::aio::Runtime;
623
624impl ConnectionLike for Client {
625    fn req_packed_command(&mut self, cmd: &[u8]) -> RedisResult<Value> {
626        self.get_connection()?.req_packed_command(cmd)
627    }
628
629    fn req_packed_commands(
630        &mut self,
631        cmd: &[u8],
632        offset: usize,
633        count: usize,
634    ) -> RedisResult<Vec<Value>> {
635        self.get_connection()?
636            .req_packed_commands(cmd, offset, count)
637    }
638
639    fn get_db(&self) -> i64 {
640        self.connection_info.redis.db
641    }
642
643    fn check_connection(&mut self) -> bool {
644        if let Ok(mut conn) = self.get_connection() {
645            conn.check_connection()
646        } else {
647            false
648        }
649    }
650
651    fn is_open(&self) -> bool {
652        if let Ok(conn) = self.get_connection() {
653            conn.is_open()
654        } else {
655            false
656        }
657    }
658}
659
660#[cfg(test)]
661mod test {
662    use super::*;
663    use assert_matches::assert_matches;
664
665    #[test]
666    fn regression_293_parse_ipv6_with_interface() {
667        assert_matches!(Client::open(("fe80::cafe:beef%eno1", 6379)), Ok(_));
668    }
669
670    #[cfg(feature = "aio")]
671    #[test]
672    fn test_async_connection_config_pipeline_buffer_size_default() {
673        let config = AsyncConnectionConfig::new();
674        assert_eq!(config.pipeline_buffer_size, None);
675    }
676
677    #[cfg(feature = "aio")]
678    #[test]
679    fn test_async_connection_config_pipeline_buffer_size_custom() {
680        let config = AsyncConnectionConfig::new().set_pipeline_buffer_size(100);
681        assert_eq!(config.pipeline_buffer_size, Some(100));
682    }
683
684    #[cfg(feature = "aio")]
685    #[test]
686    fn test_async_connection_config_concurrency_limit_default() {
687        let config = AsyncConnectionConfig::new();
688        assert_eq!(config.concurrency_limit, None);
689    }
690
691    #[cfg(feature = "aio")]
692    #[test]
693    fn test_async_connection_config_concurrency_limit_custom() {
694        let config = AsyncConnectionConfig::new().set_concurrency_limit(128);
695        assert_eq!(config.concurrency_limit, Some(128));
696    }
697}