redis/client.rs
1use std::time::Duration;
2
3#[cfg(feature = "aio")]
4use crate::aio::{AsyncPushSender, DefaultAsyncDNSResolver};
5#[cfg(feature = "token-based-authentication")]
6use crate::auth::StreamingCredentialsProvider;
7#[cfg(feature = "aio")]
8use crate::io::AsyncDNSResolver;
9use crate::{
10 connection::{Connection, ConnectionInfo, ConnectionLike, IntoConnectionInfo, connect},
11 types::{RedisResult, Value},
12};
13#[cfg(feature = "aio")]
14use std::pin::Pin;
15
16#[cfg(feature = "tls-rustls")]
17use crate::tls::{TlsCertificates, inner_build_with_tls};
18
19#[cfg(feature = "cache-aio")]
20use crate::caching::CacheConfig;
21#[cfg(all(
22 feature = "cache-aio",
23 any(feature = "connection-manager", feature = "cluster-async")
24))]
25use crate::caching::CacheManager;
26
27/// The client type.
28#[derive(Debug, Clone)]
29pub struct Client {
30 pub(crate) connection_info: ConnectionInfo,
31}
32
33/// The client acts as connector to the redis server. By itself it does not
34/// do much other than providing a convenient way to fetch a connection from
35/// it. In the future the plan is to provide a connection pool in the client.
36///
37/// When opening a client a URL in the following format should be used:
38///
39/// ```plain
40/// redis://host:port/db
41/// ```
42///
43/// Example usage::
44///
45/// ```rust,no_run
46/// let client = redis::Client::open("redis://127.0.0.1/").unwrap();
47/// let con = client.get_connection().unwrap();
48/// ```
49impl Client {
50 /// Connects to a redis server and returns a client. This does not
51 /// actually open a connection yet but it does perform some basic
52 /// checks on the URL that might make the operation fail.
53 pub fn open<T: IntoConnectionInfo>(params: T) -> RedisResult<Client> {
54 Ok(Client {
55 connection_info: params.into_connection_info()?,
56 })
57 }
58
59 /// Instructs the client to actually connect to redis and returns a
60 /// connection object. The connection object can be used to send
61 /// commands to the server. This can fail with a variety of errors
62 /// (like unreachable host) so it's important that you handle those
63 /// errors.
64 pub fn get_connection(&self) -> RedisResult<Connection> {
65 connect(&self.connection_info, None)
66 }
67
68 /// Instructs the client to actually connect to redis with specified
69 /// timeout and returns a connection object. The connection object
70 /// can be used to send commands to the server. This can fail with
71 /// a variety of errors (like unreachable host) so it's important
72 /// that you handle those errors.
73 pub fn get_connection_with_timeout(&self, timeout: Duration) -> RedisResult<Connection> {
74 connect(&self.connection_info, Some(timeout))
75 }
76
77 /// Returns a reference of client connection info object.
78 pub fn get_connection_info(&self) -> &ConnectionInfo {
79 &self.connection_info
80 }
81
82 /// Constructs a new `Client` with parameters necessary to create a TLS connection.
83 ///
84 /// - `conn_info` - URL using the `rediss://` scheme.
85 /// - `tls_certs` - `TlsCertificates` structure containing:
86 /// - `client_tls` - Optional `ClientTlsConfig` containing byte streams for
87 /// - `client_cert` - client's byte stream containing client certificate in PEM format
88 /// - `client_key` - client's byte stream containing private key in PEM format
89 /// - `root_cert` - Optional byte stream yielding PEM formatted file for root certificates.
90 ///
91 /// If `ClientTlsConfig` ( cert+key pair ) is not provided, then client-side authentication is not enabled.
92 /// If `root_cert` is not provided, then system root certificates are used instead.
93 ///
94 /// # Examples
95 ///
96 /// ```no_run
97 /// use std::{fs::File, io::{BufReader, Read}};
98 ///
99 /// use redis::{Client, AsyncTypedCommands as _, TlsCertificates, ClientTlsConfig};
100 ///
101 /// async fn do_redis_code(
102 /// url: &str,
103 /// root_cert_file: &str,
104 /// cert_file: &str,
105 /// key_file: &str
106 /// ) -> redis::RedisResult<()> {
107 /// let root_cert_file = File::open(root_cert_file).expect("cannot open private cert file");
108 /// let mut root_cert_vec = Vec::new();
109 /// BufReader::new(root_cert_file)
110 /// .read_to_end(&mut root_cert_vec)
111 /// .expect("Unable to read ROOT cert file");
112 ///
113 /// let cert_file = File::open(cert_file).expect("cannot open private cert file");
114 /// let mut client_cert_vec = Vec::new();
115 /// BufReader::new(cert_file)
116 /// .read_to_end(&mut client_cert_vec)
117 /// .expect("Unable to read client cert file");
118 ///
119 /// let key_file = File::open(key_file).expect("cannot open private key file");
120 /// let mut client_key_vec = Vec::new();
121 /// BufReader::new(key_file)
122 /// .read_to_end(&mut client_key_vec)
123 /// .expect("Unable to read client key file");
124 ///
125 /// let client = Client::build_with_tls(
126 /// url,
127 /// TlsCertificates {
128 /// client_tls: Some(ClientTlsConfig{
129 /// client_cert: client_cert_vec,
130 /// client_key: client_key_vec,
131 /// }),
132 /// root_cert: Some(root_cert_vec),
133 /// }
134 /// )
135 /// .expect("Unable to build client");
136 ///
137 /// let connection_info = client.get_connection_info();
138 ///
139 /// println!(">>> connection info: {connection_info:?}");
140 ///
141 /// let mut con = client.get_multiplexed_async_connection().await?;
142 ///
143 /// con.set("key1", b"foo").await?;
144 ///
145 /// redis::cmd("SET")
146 /// .arg(&["key2", "bar"])
147 /// .exec_async(&mut con)
148 /// .await?;
149 ///
150 /// let result = redis::cmd("MGET")
151 /// .arg(&["key1", "key2"])
152 /// .query_async(&mut con)
153 /// .await;
154 /// assert_eq!(result, Ok(("foo".to_string(), b"bar".to_vec())));
155 /// println!("Result from MGET: {result:?}");
156 ///
157 /// Ok(())
158 /// }
159 /// ```
160 #[cfg(feature = "tls-rustls")]
161 pub fn build_with_tls<C: IntoConnectionInfo>(
162 conn_info: C,
163 tls_certs: TlsCertificates,
164 ) -> RedisResult<Client> {
165 let connection_info = conn_info.into_connection_info()?;
166
167 inner_build_with_tls(connection_info, &tls_certs)
168 }
169}
170
171#[cfg(feature = "cache-aio")]
172#[derive(Clone)]
173pub(crate) enum Cache {
174 Config(CacheConfig),
175 #[cfg(any(feature = "connection-manager", feature = "cluster-async"))]
176 Manager(CacheManager),
177}
178
179#[cfg(feature = "aio")]
180pub(crate) const DEFAULT_RESPONSE_TIMEOUT: Option<Duration> = Some(Duration::from_millis(500));
181#[cfg(any(feature = "aio", feature = "cluster"))]
182pub(crate) const DEFAULT_CONNECTION_TIMEOUT: Option<Duration> = Some(Duration::from_secs(1));
183
184/// Options for creation of async connection
185#[cfg(feature = "aio")]
186#[derive(Clone)]
187pub struct AsyncConnectionConfig {
188 /// Maximum time to wait for a response from the server
189 pub(crate) response_timeout: Option<Duration>,
190 /// Maximum time to wait for a connection to be established
191 pub(crate) connection_timeout: Option<Duration>,
192 pub(crate) push_sender: Option<std::sync::Arc<dyn AsyncPushSender>>,
193 #[cfg(feature = "cache-aio")]
194 pub(crate) cache: Option<Cache>,
195 pub(crate) dns_resolver: Option<std::sync::Arc<dyn AsyncDNSResolver>>,
196 pub(crate) pipeline_buffer_size: Option<usize>,
197 pub(crate) concurrency_limit: Option<usize>,
198 /// Optional credentials provider for dynamic authentication (e.g., token-based authentication)
199 #[cfg(feature = "token-based-authentication")]
200 pub(crate) credentials_provider: Option<std::sync::Arc<dyn StreamingCredentialsProvider>>,
201}
202
203#[cfg(feature = "aio")]
204impl Default for AsyncConnectionConfig {
205 fn default() -> Self {
206 Self {
207 response_timeout: DEFAULT_RESPONSE_TIMEOUT,
208 connection_timeout: DEFAULT_CONNECTION_TIMEOUT,
209 push_sender: Default::default(),
210 #[cfg(feature = "cache-aio")]
211 cache: Default::default(),
212 dns_resolver: Default::default(),
213 pipeline_buffer_size: None,
214 concurrency_limit: None,
215 #[cfg(feature = "token-based-authentication")]
216 credentials_provider: None,
217 }
218 }
219}
220
221#[cfg(feature = "aio")]
222impl AsyncConnectionConfig {
223 /// Creates a new instance of the config with all parameters set to default values.
224 pub fn new() -> Self {
225 Self::default()
226 }
227
228 /// Each connection attempt to the server will time out after `connection_timeout`.
229 ///
230 /// Set `None` if you don't want the connection attempt to time out.
231 pub fn set_connection_timeout(mut self, connection_timeout: Option<Duration>) -> Self {
232 self.connection_timeout = connection_timeout;
233 self
234 }
235
236 /// The new connection will time out operations after `response_timeout` has passed.
237 ///
238 /// Set `None` if you don't want requests to time out.
239 pub fn set_response_timeout(mut self, response_timeout: Option<Duration>) -> Self {
240 self.response_timeout = response_timeout;
241 self
242 }
243
244 /// Sets sender sender for push values.
245 ///
246 /// The sender can be a channel, or an arbitrary function that handles [crate::PushInfo] values.
247 /// This will fail client creation if the connection isn't configured for RESP3 communications via the [crate::RedisConnectionInfo::set_protocol] function.
248 ///
249 /// # Examples
250 ///
251 /// ```rust
252 /// # use redis::AsyncConnectionConfig;
253 /// let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
254 /// let config = AsyncConnectionConfig::new().set_push_sender(tx);
255 /// ```
256 ///
257 /// ```rust
258 /// # use std::sync::{Mutex, Arc};
259 /// # use redis::AsyncConnectionConfig;
260 /// let messages = Arc::new(Mutex::new(Vec::new()));
261 /// let config = AsyncConnectionConfig::new().set_push_sender(move |msg|{
262 /// let Ok(mut messages) = messages.lock() else {
263 /// return Err(redis::aio::SendError);
264 /// };
265 /// messages.push(msg);
266 /// Ok(())
267 /// });
268 /// ```
269 pub fn set_push_sender(self, sender: impl AsyncPushSender) -> Self {
270 self.set_push_sender_internal(std::sync::Arc::new(sender))
271 }
272
273 pub(crate) fn set_push_sender_internal(
274 mut self,
275 sender: std::sync::Arc<dyn AsyncPushSender>,
276 ) -> Self {
277 self.push_sender = Some(sender);
278 self
279 }
280
281 /// Sets cache config for MultiplexedConnection, check CacheConfig for more details.
282 #[cfg(feature = "cache-aio")]
283 pub fn set_cache_config(mut self, cache_config: CacheConfig) -> Self {
284 self.cache = Some(Cache::Config(cache_config));
285 self
286 }
287
288 #[cfg(all(
289 feature = "cache-aio",
290 any(feature = "connection-manager", feature = "cluster-async")
291 ))]
292 pub(crate) fn set_cache_manager(mut self, cache_manager: CacheManager) -> Self {
293 self.cache = Some(Cache::Manager(cache_manager));
294 self
295 }
296
297 /// Set the DNS resolver for the underlying TCP connection.
298 ///
299 /// The parameter resolver must implement the [`crate::io::AsyncDNSResolver`] trait.
300 pub fn set_dns_resolver(self, dns_resolver: impl AsyncDNSResolver) -> Self {
301 self.set_dns_resolver_internal(std::sync::Arc::new(dns_resolver))
302 }
303
304 pub(super) fn set_dns_resolver_internal(
305 mut self,
306 dns_resolver: std::sync::Arc<dyn AsyncDNSResolver>,
307 ) -> Self {
308 self.dns_resolver = Some(dns_resolver);
309 self
310 }
311
312 /// Sets the buffer size for the internal pipeline channel.
313 ///
314 /// The multiplexed connection uses an internal channel to queue Redis commands
315 /// before sending them to the server. This setting controls how many commands
316 /// can be buffered in that channel.
317 ///
318 /// When the buffer is full, callers will asynchronously wait until space becomes
319 /// available. A larger buffer allows more commands to be queued during bursts of
320 /// activity, reducing wait time for callers. However, this comes at the cost of
321 /// increased memory usage.
322 ///
323 /// The default value is 50. Consider increasing this value for high-concurrency
324 /// scenarios (e.g., web servers handling many simultaneous requests) where
325 /// buffer contention may increase overall latency and cause upstream timeouts.
326 pub fn set_pipeline_buffer_size(mut self, size: usize) -> Self {
327 self.pipeline_buffer_size = Some(size);
328 self
329 }
330
331 /// Sets the maximum number of concurrent in-flight requests on this connection.
332 ///
333 /// When set, at most `limit` requests can be awaiting a response at any given time.
334 /// Additional requests will wait until an in-flight request completes.
335 ///
336 /// Pipelined commands try to acquire one permit per command, but will proceed with
337 /// fewer if not all are immediately available. This means a pipeline may temporarily
338 /// push the effective in-flight count above the limit.
339 ///
340 /// This is useful for preventing a large backlog of commands from building up when the
341 /// server becomes slow or unresponsive. Without a limit, requests continue to queue
342 /// unboundedly. When the server is degraded, requests near the back of the queue spend
343 /// most of their time waiting behind earlier requests and are likely to hit their response
344 /// timeout before the server even processes them -- wasting work on both sides. Setting a
345 /// concurrency limit caps the number of in-flight requests, so backpressure is applied
346 /// earlier and fewer requests are lost to timeouts.
347 ///
348 /// By default there is no limit.
349 pub fn set_concurrency_limit(mut self, limit: usize) -> Self {
350 self.concurrency_limit = Some(limit);
351 self
352 }
353
354 /// Sets a credentials provider for dynamic authentication (e.g., token-based authentication).
355 ///
356 /// This is useful for authentication mechanisms that require periodic credential refresh,
357 /// such as Microsoft Entra ID (formerly Azure AD).
358 ///
359 /// # Example
360 ///
361 /// ```rust,no_run
362 /// # #[cfg(feature = "entra-id")]
363 /// # {
364 /// use redis::{AsyncConnectionConfig, EntraIdCredentialsProvider, RetryConfig};
365 ///
366 /// # async fn example() -> redis::RedisResult<()> {
367 /// let mut provider = EntraIdCredentialsProvider::new_developer_tools()?;
368 /// provider.start(RetryConfig::default());
369 ///
370 /// let config = AsyncConnectionConfig::new()
371 /// .set_credentials_provider(provider);
372 /// # Ok(())
373 /// # }
374 /// # }
375 /// ```
376 #[cfg(feature = "token-based-authentication")]
377 pub fn set_credentials_provider<P>(self, provider: P) -> Self
378 where
379 P: StreamingCredentialsProvider + 'static,
380 {
381 self.set_credentials_provider_internal(std::sync::Arc::new(provider))
382 }
383
384 #[cfg(feature = "token-based-authentication")]
385 pub(crate) fn set_credentials_provider_internal(
386 mut self,
387 provider: std::sync::Arc<dyn StreamingCredentialsProvider>,
388 ) -> Self {
389 self.credentials_provider = Some(provider);
390 self
391 }
392}
393
394/// To enable async support you need to chose one of the supported runtimes and active its
395/// corresponding feature: `tokio-comp` or `smol-comp`
396#[cfg(feature = "aio")]
397#[cfg_attr(docsrs, doc(cfg(feature = "aio")))]
398impl Client {
399 /// Returns an async connection from the client.
400 #[cfg(feature = "aio")]
401 #[cfg_attr(docsrs, doc(cfg(feature = "aio")))]
402 pub async fn get_multiplexed_async_connection(
403 &self,
404 ) -> RedisResult<crate::aio::MultiplexedConnection> {
405 self.get_multiplexed_async_connection_with_config(&AsyncConnectionConfig::new())
406 .await
407 }
408
409 /// Returns an async connection from the client.
410 #[cfg(feature = "aio")]
411 #[cfg_attr(docsrs, doc(cfg(feature = "aio")))]
412 pub async fn get_multiplexed_async_connection_with_config(
413 &self,
414 config: &AsyncConnectionConfig,
415 ) -> RedisResult<crate::aio::MultiplexedConnection> {
416 match Runtime::locate() {
417 #[cfg(feature = "tokio-comp")]
418 rt @ Runtime::Tokio => self
419 .get_multiplexed_async_connection_inner_with_timeout::<crate::aio::tokio::Tokio>(
420 config, rt,
421 )
422 .await,
423
424 #[cfg(feature = "smol-comp")]
425 rt @ Runtime::Smol => {
426 self.get_multiplexed_async_connection_inner_with_timeout::<crate::aio::smol::Smol>(
427 config, rt,
428 )
429 .await
430 }
431 }
432 }
433
434 /// Returns an async [`ConnectionManager`][connection-manager] from the client.
435 ///
436 /// The connection manager wraps a
437 /// [`MultiplexedConnection`][multiplexed-connection]. If a command to that
438 /// connection fails with a connection error, then a new connection is
439 /// established in the background and the error is returned to the caller.
440 ///
441 /// This means that on connection loss at least one command will fail, but
442 /// the connection will be re-established automatically if possible. Please
443 /// refer to the [`ConnectionManager`][connection-manager] docs for
444 /// detailed reconnecting behavior.
445 ///
446 /// A connection manager can be cloned, allowing requests to be sent concurrently
447 /// on the same underlying connection (tcp/unix socket).
448 ///
449 /// [connection-manager]: aio/struct.ConnectionManager.html
450 /// [multiplexed-connection]: aio/struct.MultiplexedConnection.html
451 #[cfg(feature = "connection-manager")]
452 #[cfg_attr(docsrs, doc(cfg(feature = "connection-manager")))]
453 pub async fn get_connection_manager(&self) -> RedisResult<crate::aio::ConnectionManager> {
454 crate::aio::ConnectionManager::new(self.clone()).await
455 }
456
457 /// Returns an async [`ConnectionManager`][connection-manager] from the client without establishing a connection.
458 ///
459 /// The connection will be established lazily on the first request.
460 ///
461 /// [connection-manager]: aio/struct.ConnectionManager.html
462 #[cfg(feature = "connection-manager")]
463 #[cfg_attr(docsrs, doc(cfg(feature = "connection-manager")))]
464 pub fn get_connection_manager_lazy(
465 &self,
466 config: crate::aio::ConnectionManagerConfig,
467 ) -> RedisResult<crate::aio::ConnectionManager> {
468 crate::aio::ConnectionManager::new_lazy_with_config(self.clone(), config)
469 }
470
471 /// Returns an async [`ConnectionManager`][connection-manager] from the client.
472 ///
473 /// The connection manager wraps a
474 /// [`MultiplexedConnection`][multiplexed-connection]. If a command to that
475 /// connection fails with a connection error, then a new connection is
476 /// established in the background and the error is returned to the caller.
477 ///
478 /// This means that on connection loss at least one command will fail, but
479 /// the connection will be re-established automatically if possible. Please
480 /// refer to the [`ConnectionManager`][connection-manager] docs for
481 /// detailed reconnecting behavior.
482 ///
483 /// A connection manager can be cloned, allowing requests to be sent concurrently
484 /// on the same underlying connection (tcp/unix socket).
485 ///
486 /// [connection-manager]: aio/struct.ConnectionManager.html
487 /// [multiplexed-connection]: aio/struct.MultiplexedConnection.html
488 #[cfg(feature = "connection-manager")]
489 #[cfg_attr(docsrs, doc(cfg(feature = "connection-manager")))]
490 pub async fn get_connection_manager_with_config(
491 &self,
492 config: crate::aio::ConnectionManagerConfig,
493 ) -> RedisResult<crate::aio::ConnectionManager> {
494 crate::aio::ConnectionManager::new_with_config(self.clone(), config).await
495 }
496
497 async fn get_multiplexed_async_connection_inner_with_timeout<T>(
498 &self,
499 config: &AsyncConnectionConfig,
500 rt: Runtime,
501 ) -> RedisResult<crate::aio::MultiplexedConnection>
502 where
503 T: crate::aio::RedisRuntime,
504 {
505 let result = if let Some(connection_timeout) = config.connection_timeout {
506 rt.timeout(
507 connection_timeout,
508 self.get_multiplexed_async_connection_inner::<T>(config),
509 )
510 .await
511 } else {
512 Ok(self
513 .get_multiplexed_async_connection_inner::<T>(config)
514 .await)
515 };
516
517 match result {
518 Ok(Ok(connection)) => Ok(connection),
519 Ok(Err(e)) => Err(e),
520 Err(elapsed) => Err(elapsed.into()),
521 }
522 }
523
524 async fn get_multiplexed_async_connection_inner<T>(
525 &self,
526 config: &AsyncConnectionConfig,
527 ) -> RedisResult<crate::aio::MultiplexedConnection>
528 where
529 T: crate::aio::RedisRuntime,
530 {
531 let (mut connection, driver) = self
532 .create_multiplexed_async_connection_inner::<T>(config)
533 .await?;
534 let handle = T::spawn(driver);
535 connection.set_task_handle(handle);
536 Ok(connection)
537 }
538
539 async fn create_multiplexed_async_connection_inner<T>(
540 &self,
541 config: &AsyncConnectionConfig,
542 ) -> RedisResult<(
543 crate::aio::MultiplexedConnection,
544 impl std::future::Future<Output = ()> + 'static,
545 )>
546 where
547 T: crate::aio::RedisRuntime,
548 {
549 let resolver = config
550 .dns_resolver
551 .as_deref()
552 .unwrap_or(&DefaultAsyncDNSResolver);
553 let con = self.get_simple_async_connection::<T>(resolver).await?;
554 crate::aio::MultiplexedConnection::new_with_config(
555 &self.connection_info.redis,
556 con,
557 config.clone(),
558 )
559 .await
560 }
561
562 async fn get_simple_async_connection_dynamically(
563 &self,
564 dns_resolver: &dyn AsyncDNSResolver,
565 ) -> RedisResult<Pin<Box<dyn crate::aio::AsyncStream + Send + Sync>>> {
566 match Runtime::locate() {
567 #[cfg(feature = "tokio-comp")]
568 Runtime::Tokio => {
569 self.get_simple_async_connection::<crate::aio::tokio::Tokio>(dns_resolver)
570 .await
571 }
572
573 #[cfg(feature = "smol-comp")]
574 Runtime::Smol => {
575 self.get_simple_async_connection::<crate::aio::smol::Smol>(dns_resolver)
576 .await
577 }
578 }
579 }
580
581 async fn get_simple_async_connection<T>(
582 &self,
583 dns_resolver: &dyn AsyncDNSResolver,
584 ) -> RedisResult<Pin<Box<dyn crate::aio::AsyncStream + Send + Sync>>>
585 where
586 T: crate::aio::RedisRuntime,
587 {
588 Ok(
589 crate::aio::connect_simple::<T>(&self.connection_info, dns_resolver)
590 .await?
591 .boxed(),
592 )
593 }
594
595 #[cfg(feature = "connection-manager")]
596 pub(crate) fn connection_info(&self) -> &ConnectionInfo {
597 &self.connection_info
598 }
599
600 /// Returns an async receiver for pub-sub messages.
601 #[cfg(feature = "aio")]
602 // TODO - do we want to type-erase pubsub using a trait, to allow us to replace it with a different implementation later?
603 pub async fn get_async_pubsub(&self) -> RedisResult<crate::aio::PubSub> {
604 let connection = self
605 .get_simple_async_connection_dynamically(&DefaultAsyncDNSResolver)
606 .await?;
607
608 crate::aio::PubSub::new(&self.connection_info.redis, connection).await
609 }
610
611 /// Returns an async receiver for monitor messages.
612 #[cfg(feature = "aio")]
613 pub async fn get_async_monitor(&self) -> RedisResult<crate::aio::Monitor> {
614 let connection = self
615 .get_simple_async_connection_dynamically(&DefaultAsyncDNSResolver)
616 .await?;
617 crate::aio::Monitor::new(&self.connection_info.redis, connection).await
618 }
619}
620
621#[cfg(feature = "aio")]
622use crate::aio::Runtime;
623
624impl ConnectionLike for Client {
625 fn req_packed_command(&mut self, cmd: &[u8]) -> RedisResult<Value> {
626 self.get_connection()?.req_packed_command(cmd)
627 }
628
629 fn req_packed_commands(
630 &mut self,
631 cmd: &[u8],
632 offset: usize,
633 count: usize,
634 ) -> RedisResult<Vec<Value>> {
635 self.get_connection()?
636 .req_packed_commands(cmd, offset, count)
637 }
638
639 fn get_db(&self) -> i64 {
640 self.connection_info.redis.db
641 }
642
643 fn check_connection(&mut self) -> bool {
644 if let Ok(mut conn) = self.get_connection() {
645 conn.check_connection()
646 } else {
647 false
648 }
649 }
650
651 fn is_open(&self) -> bool {
652 if let Ok(conn) = self.get_connection() {
653 conn.is_open()
654 } else {
655 false
656 }
657 }
658}
659
660#[cfg(test)]
661mod test {
662 use super::*;
663 use assert_matches::assert_matches;
664
665 #[test]
666 fn regression_293_parse_ipv6_with_interface() {
667 assert_matches!(Client::open(("fe80::cafe:beef%eno1", 6379)), Ok(_));
668 }
669
670 #[cfg(feature = "aio")]
671 #[test]
672 fn test_async_connection_config_pipeline_buffer_size_default() {
673 let config = AsyncConnectionConfig::new();
674 assert_eq!(config.pipeline_buffer_size, None);
675 }
676
677 #[cfg(feature = "aio")]
678 #[test]
679 fn test_async_connection_config_pipeline_buffer_size_custom() {
680 let config = AsyncConnectionConfig::new().set_pipeline_buffer_size(100);
681 assert_eq!(config.pipeline_buffer_size, Some(100));
682 }
683
684 #[cfg(feature = "aio")]
685 #[test]
686 fn test_async_connection_config_concurrency_limit_default() {
687 let config = AsyncConnectionConfig::new();
688 assert_eq!(config.concurrency_limit, None);
689 }
690
691 #[cfg(feature = "aio")]
692 #[test]
693 fn test_async_connection_config_concurrency_limit_custom() {
694 let config = AsyncConnectionConfig::new().set_concurrency_limit(128);
695 assert_eq!(config.concurrency_limit, Some(128));
696 }
697}