redis/client.rs
1use std::time::Duration;
2
3#[cfg(feature = "aio")]
4use crate::aio::{AsyncPushSender, DefaultAsyncDNSResolver};
5#[cfg(feature = "aio")]
6use crate::io::{tcp::TcpSettings, AsyncDNSResolver};
7use crate::{
8 connection::{connect, Connection, ConnectionInfo, ConnectionLike, IntoConnectionInfo},
9 types::{RedisResult, Value},
10};
11#[cfg(feature = "aio")]
12use std::pin::Pin;
13
14#[cfg(feature = "tls-rustls")]
15use crate::tls::{inner_build_with_tls, TlsCertificates};
16
17#[cfg(feature = "cache-aio")]
18use crate::caching::CacheConfig;
19#[cfg(all(feature = "cache-aio", feature = "connection-manager"))]
20use crate::caching::CacheManager;
21
22/// The client type.
23#[derive(Debug, Clone)]
24pub struct Client {
25 pub(crate) connection_info: ConnectionInfo,
26}
27
28/// The client acts as connector to the redis server. By itself it does not
29/// do much other than providing a convenient way to fetch a connection from
30/// it. In the future the plan is to provide a connection pool in the client.
31///
32/// When opening a client a URL in the following format should be used:
33///
34/// ```plain
35/// redis://host:port/db
36/// ```
37///
38/// Example usage::
39///
40/// ```rust,no_run
41/// let client = redis::Client::open("redis://127.0.0.1/").unwrap();
42/// let con = client.get_connection().unwrap();
43/// ```
44impl Client {
45 /// Connects to a redis server and returns a client. This does not
46 /// actually open a connection yet but it does perform some basic
47 /// checks on the URL that might make the operation fail.
48 pub fn open<T: IntoConnectionInfo>(params: T) -> RedisResult<Client> {
49 Ok(Client {
50 connection_info: params.into_connection_info()?,
51 })
52 }
53
54 /// Instructs the client to actually connect to redis and returns a
55 /// connection object. The connection object can be used to send
56 /// commands to the server. This can fail with a variety of errors
57 /// (like unreachable host) so it's important that you handle those
58 /// errors.
59 pub fn get_connection(&self) -> RedisResult<Connection> {
60 connect(&self.connection_info, None)
61 }
62
63 /// Instructs the client to actually connect to redis with specified
64 /// timeout and returns a connection object. The connection object
65 /// can be used to send commands to the server. This can fail with
66 /// a variety of errors (like unreachable host) so it's important
67 /// that you handle those errors.
68 pub fn get_connection_with_timeout(&self, timeout: Duration) -> RedisResult<Connection> {
69 connect(&self.connection_info, Some(timeout))
70 }
71
72 /// Returns a reference of client connection info object.
73 pub fn get_connection_info(&self) -> &ConnectionInfo {
74 &self.connection_info
75 }
76
77 /// Constructs a new `Client` with parameters necessary to create a TLS connection.
78 ///
79 /// - `conn_info` - URL using the `rediss://` scheme.
80 /// - `tls_certs` - `TlsCertificates` structure containing:
81 /// - `client_tls` - Optional `ClientTlsConfig` containing byte streams for
82 /// - `client_cert` - client's byte stream containing client certificate in PEM format
83 /// - `client_key` - client's byte stream containing private key in PEM format
84 /// - `root_cert` - Optional byte stream yielding PEM formatted file for root certificates.
85 ///
86 /// If `ClientTlsConfig` ( cert+key pair ) is not provided, then client-side authentication is not enabled.
87 /// If `root_cert` is not provided, then system root certificates are used instead.
88 ///
89 /// # Examples
90 ///
91 /// ```no_run
92 /// use std::{fs::File, io::{BufReader, Read}};
93 ///
94 /// use redis::{Client, AsyncCommands as _, TlsCertificates, ClientTlsConfig};
95 ///
96 /// async fn do_redis_code(
97 /// url: &str,
98 /// root_cert_file: &str,
99 /// cert_file: &str,
100 /// key_file: &str
101 /// ) -> redis::RedisResult<()> {
102 /// let root_cert_file = File::open(root_cert_file).expect("cannot open private cert file");
103 /// let mut root_cert_vec = Vec::new();
104 /// BufReader::new(root_cert_file)
105 /// .read_to_end(&mut root_cert_vec)
106 /// .expect("Unable to read ROOT cert file");
107 ///
108 /// let cert_file = File::open(cert_file).expect("cannot open private cert file");
109 /// let mut client_cert_vec = Vec::new();
110 /// BufReader::new(cert_file)
111 /// .read_to_end(&mut client_cert_vec)
112 /// .expect("Unable to read client cert file");
113 ///
114 /// let key_file = File::open(key_file).expect("cannot open private key file");
115 /// let mut client_key_vec = Vec::new();
116 /// BufReader::new(key_file)
117 /// .read_to_end(&mut client_key_vec)
118 /// .expect("Unable to read client key file");
119 ///
120 /// let client = Client::build_with_tls(
121 /// url,
122 /// TlsCertificates {
123 /// client_tls: Some(ClientTlsConfig{
124 /// client_cert: client_cert_vec,
125 /// client_key: client_key_vec,
126 /// }),
127 /// root_cert: Some(root_cert_vec),
128 /// }
129 /// )
130 /// .expect("Unable to build client");
131 ///
132 /// let connection_info = client.get_connection_info();
133 ///
134 /// println!(">>> connection info: {connection_info:?}");
135 ///
136 /// let mut con = client.get_multiplexed_async_connection().await?;
137 ///
138 /// con.set("key1", b"foo").await?;
139 ///
140 /// redis::cmd("SET")
141 /// .arg(&["key2", "bar"])
142 /// .exec_async(&mut con)
143 /// .await?;
144 ///
145 /// let result = redis::cmd("MGET")
146 /// .arg(&["key1", "key2"])
147 /// .query_async(&mut con)
148 /// .await;
149 /// assert_eq!(result, Ok(("foo".to_string(), b"bar".to_vec())));
150 /// println!("Result from MGET: {result:?}");
151 ///
152 /// Ok(())
153 /// }
154 /// ```
155 #[cfg(feature = "tls-rustls")]
156 pub fn build_with_tls<C: IntoConnectionInfo>(
157 conn_info: C,
158 tls_certs: TlsCertificates,
159 ) -> RedisResult<Client> {
160 let connection_info = conn_info.into_connection_info()?;
161
162 inner_build_with_tls(connection_info, &tls_certs)
163 }
164}
165
166#[cfg(feature = "cache-aio")]
167#[derive(Clone)]
168pub(crate) enum Cache {
169 Config(CacheConfig),
170 #[cfg(feature = "connection-manager")]
171 Manager(CacheManager),
172}
173
174/// Options for creation of async connection
175#[cfg(feature = "aio")]
176#[derive(Clone, Default)]
177pub struct AsyncConnectionConfig {
178 /// Maximum time to wait for a response from the server
179 pub(crate) response_timeout: Option<std::time::Duration>,
180 /// Maximum time to wait for a connection to be established
181 pub(crate) connection_timeout: Option<std::time::Duration>,
182 pub(crate) push_sender: Option<std::sync::Arc<dyn AsyncPushSender>>,
183 #[cfg(feature = "cache-aio")]
184 pub(crate) cache: Option<Cache>,
185 pub(crate) tcp_settings: TcpSettings,
186 pub(crate) dns_resolver: Option<std::sync::Arc<dyn AsyncDNSResolver>>,
187}
188
189#[cfg(feature = "aio")]
190impl AsyncConnectionConfig {
191 /// Creates a new instance of the options with nothing set
192 pub fn new() -> Self {
193 Self::default()
194 }
195
196 /// Sets the connection timeout
197 pub fn set_connection_timeout(mut self, connection_timeout: std::time::Duration) -> Self {
198 self.connection_timeout = Some(connection_timeout);
199 self
200 }
201
202 /// Sets the response timeout
203 pub fn set_response_timeout(mut self, response_timeout: std::time::Duration) -> Self {
204 self.response_timeout = Some(response_timeout);
205 self
206 }
207
208 /// Sets sender sender for push values.
209 ///
210 /// The sender can be a channel, or an arbitrary function that handles [crate::PushInfo] values.
211 /// This will fail client creation if the connection isn't configured for RESP3 communications via the [crate::RedisConnectionInfo::protocol] field.
212 ///
213 /// # Examples
214 ///
215 /// ```rust
216 /// # use redis::AsyncConnectionConfig;
217 /// let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
218 /// let config = AsyncConnectionConfig::new().set_push_sender(tx);
219 /// ```
220 ///
221 /// ```rust
222 /// # use std::sync::{Mutex, Arc};
223 /// # use redis::AsyncConnectionConfig;
224 /// let messages = Arc::new(Mutex::new(Vec::new()));
225 /// let config = AsyncConnectionConfig::new().set_push_sender(move |msg|{
226 /// let Ok(mut messages) = messages.lock() else {
227 /// return Err(redis::aio::SendError);
228 /// };
229 /// messages.push(msg);
230 /// Ok(())
231 /// });
232 /// ```
233 pub fn set_push_sender(self, sender: impl AsyncPushSender) -> Self {
234 self.set_push_sender_internal(std::sync::Arc::new(sender))
235 }
236
237 pub(crate) fn set_push_sender_internal(
238 mut self,
239 sender: std::sync::Arc<dyn AsyncPushSender>,
240 ) -> Self {
241 self.push_sender = Some(sender);
242 self
243 }
244
245 /// Sets cache config for MultiplexedConnection, check CacheConfig for more details.
246 #[cfg(feature = "cache-aio")]
247 pub fn set_cache_config(mut self, cache_config: CacheConfig) -> Self {
248 self.cache = Some(Cache::Config(cache_config));
249 self
250 }
251
252 #[cfg(all(feature = "cache-aio", feature = "connection-manager"))]
253 pub(crate) fn set_cache_manager(mut self, cache_manager: CacheManager) -> Self {
254 self.cache = Some(Cache::Manager(cache_manager));
255 self
256 }
257
258 /// Set the behavior of the underlying TCP connection.
259 pub fn set_tcp_settings(self, tcp_settings: crate::io::tcp::TcpSettings) -> Self {
260 Self {
261 tcp_settings,
262 ..self
263 }
264 }
265
266 /// Set the DNS resolver for the underlying TCP connection.
267 ///
268 /// The parameter resolver must implement the [`crate::aio::DNSResolver`] trait.
269 pub fn set_dns_resolver(self, dns_resolver: impl AsyncDNSResolver) -> Self {
270 self.set_dns_resolver_internal(std::sync::Arc::new(dns_resolver))
271 }
272
273 pub(super) fn set_dns_resolver_internal(
274 mut self,
275 dns_resolver: std::sync::Arc<dyn AsyncDNSResolver>,
276 ) -> Self {
277 self.dns_resolver = Some(dns_resolver);
278 self
279 }
280}
281
282/// To enable async support you need to chose one of the supported runtimes and active its
283/// corresponding feature: `tokio-comp` or `async-std-comp`
284#[cfg(feature = "aio")]
285#[cfg_attr(docsrs, doc(cfg(feature = "aio")))]
286impl Client {
287 /// Returns an async connection from the client.
288 #[cfg(any(feature = "tokio-comp", feature = "async-std-comp"))]
289 #[deprecated(
290 note = "aio::Connection is deprecated. Use client::get_multiplexed_async_connection instead."
291 )]
292 #[allow(deprecated)]
293 pub async fn get_async_connection(&self) -> RedisResult<crate::aio::Connection> {
294 let con = self
295 .get_simple_async_connection_dynamically(
296 &DefaultAsyncDNSResolver,
297 &TcpSettings::default(),
298 )
299 .await?;
300
301 crate::aio::Connection::new(&self.connection_info.redis, con).await
302 }
303
304 /// Returns an async connection from the client.
305 #[cfg(feature = "tokio-comp")]
306 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-comp")))]
307 #[deprecated(
308 note = "aio::Connection is deprecated. Use client::get_multiplexed_async_connection instead."
309 )]
310 #[allow(deprecated)]
311 pub async fn get_tokio_connection(&self) -> RedisResult<crate::aio::Connection> {
312 use crate::aio::RedisRuntime;
313 Ok(
314 crate::aio::connect::<crate::aio::tokio::Tokio>(&self.connection_info)
315 .await?
316 .map(RedisRuntime::boxed),
317 )
318 }
319
320 /// Returns an async connection from the client.
321 #[cfg(feature = "async-std-comp")]
322 #[cfg_attr(docsrs, doc(cfg(feature = "async-std-comp")))]
323 #[deprecated(
324 note = "aio::Connection is deprecated. Use client::get_multiplexed_async_std_connection instead."
325 )]
326 #[allow(deprecated)]
327 pub async fn get_async_std_connection(&self) -> RedisResult<crate::aio::Connection> {
328 use crate::aio::RedisRuntime;
329 Ok(
330 crate::aio::connect::<crate::aio::async_std::AsyncStd>(&self.connection_info)
331 .await?
332 .map(RedisRuntime::boxed),
333 )
334 }
335
336 /// Returns an async connection from the client.
337 #[cfg(any(feature = "tokio-comp", feature = "async-std-comp"))]
338 #[cfg_attr(
339 docsrs,
340 doc(cfg(any(feature = "tokio-comp", feature = "async-std-comp")))
341 )]
342 pub async fn get_multiplexed_async_connection(
343 &self,
344 ) -> RedisResult<crate::aio::MultiplexedConnection> {
345 self.get_multiplexed_async_connection_with_config(&AsyncConnectionConfig::new())
346 .await
347 }
348
349 /// Returns an async connection from the client.
350 #[cfg(any(feature = "tokio-comp", feature = "async-std-comp"))]
351 #[cfg_attr(
352 docsrs,
353 doc(cfg(any(feature = "tokio-comp", feature = "async-std-comp")))
354 )]
355 #[deprecated(note = "Use `get_multiplexed_async_connection_with_config` instead")]
356 pub async fn get_multiplexed_async_connection_with_timeouts(
357 &self,
358 response_timeout: std::time::Duration,
359 connection_timeout: std::time::Duration,
360 ) -> RedisResult<crate::aio::MultiplexedConnection> {
361 self.get_multiplexed_async_connection_with_config(
362 &AsyncConnectionConfig::new()
363 .set_connection_timeout(connection_timeout)
364 .set_response_timeout(response_timeout),
365 )
366 .await
367 }
368
369 /// Returns an async connection from the client.
370 #[cfg(any(feature = "tokio-comp", feature = "async-std-comp"))]
371 #[cfg_attr(
372 docsrs,
373 doc(cfg(any(feature = "tokio-comp", feature = "async-std-comp")))
374 )]
375 pub async fn get_multiplexed_async_connection_with_config(
376 &self,
377 config: &AsyncConnectionConfig,
378 ) -> RedisResult<crate::aio::MultiplexedConnection> {
379 let result = match Runtime::locate() {
380 #[cfg(feature = "tokio-comp")]
381 rt @ Runtime::Tokio => {
382 if let Some(connection_timeout) = config.connection_timeout {
383 rt.timeout(
384 connection_timeout,
385 self.get_multiplexed_async_connection_inner::<crate::aio::tokio::Tokio>(
386 config,
387 ),
388 )
389 .await
390 } else {
391 Ok(self
392 .get_multiplexed_async_connection_inner::<crate::aio::tokio::Tokio>(config)
393 .await)
394 }
395 }
396 #[cfg(feature = "async-std-comp")]
397 rt @ Runtime::AsyncStd => {
398 if let Some(connection_timeout) = config.connection_timeout {
399 rt.timeout(
400 connection_timeout,
401 self.get_multiplexed_async_connection_inner::<crate::aio::async_std::AsyncStd>(
402 config,
403 ),
404 )
405 .await
406 } else {
407 Ok(self
408 .get_multiplexed_async_connection_inner::<crate::aio::async_std::AsyncStd>(
409 config,
410 )
411 .await)
412 }
413 }
414 };
415
416 match result {
417 Ok(Ok(connection)) => Ok(connection),
418 Ok(Err(e)) => Err(e),
419 Err(elapsed) => Err(elapsed.into()),
420 }
421 }
422
423 /// Returns an async multiplexed connection from the client.
424 ///
425 /// A multiplexed connection can be cloned, allowing requests to be sent concurrently
426 /// on the same underlying connection (tcp/unix socket).
427 #[cfg(feature = "tokio-comp")]
428 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-comp")))]
429 pub async fn get_multiplexed_tokio_connection_with_response_timeouts(
430 &self,
431 response_timeout: std::time::Duration,
432 connection_timeout: std::time::Duration,
433 ) -> RedisResult<crate::aio::MultiplexedConnection> {
434 let result = Runtime::locate()
435 .timeout(
436 connection_timeout,
437 self.get_multiplexed_async_connection_inner::<crate::aio::tokio::Tokio>(
438 &AsyncConnectionConfig::new().set_response_timeout(response_timeout),
439 ),
440 )
441 .await;
442
443 match result {
444 Ok(Ok(connection)) => Ok(connection),
445 Ok(Err(e)) => Err(e),
446 Err(elapsed) => Err(elapsed.into()),
447 }
448 }
449
450 /// Returns an async multiplexed connection from the client.
451 ///
452 /// A multiplexed connection can be cloned, allowing requests to be sent concurrently
453 /// on the same underlying connection (tcp/unix socket).
454 #[cfg(feature = "tokio-comp")]
455 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-comp")))]
456 pub async fn get_multiplexed_tokio_connection(
457 &self,
458 ) -> RedisResult<crate::aio::MultiplexedConnection> {
459 self.get_multiplexed_async_connection_inner::<crate::aio::tokio::Tokio>(
460 &AsyncConnectionConfig::new(),
461 )
462 .await
463 }
464
465 /// Returns an async multiplexed connection from the client.
466 ///
467 /// A multiplexed connection can be cloned, allowing requests to be sent concurrently
468 /// on the same underlying connection (tcp/unix socket).
469 #[cfg(feature = "async-std-comp")]
470 #[cfg_attr(docsrs, doc(cfg(feature = "async-std-comp")))]
471 pub async fn get_multiplexed_async_std_connection_with_timeouts(
472 &self,
473 response_timeout: std::time::Duration,
474 connection_timeout: std::time::Duration,
475 ) -> RedisResult<crate::aio::MultiplexedConnection> {
476 let result = Runtime::locate()
477 .timeout(
478 connection_timeout,
479 self.get_multiplexed_async_connection_inner::<crate::aio::async_std::AsyncStd>(
480 &AsyncConnectionConfig::new().set_response_timeout(response_timeout),
481 ),
482 )
483 .await;
484
485 match result {
486 Ok(Ok(connection)) => Ok(connection),
487 Ok(Err(e)) => Err(e),
488 Err(elapsed) => Err(elapsed.into()),
489 }
490 }
491
492 /// Returns an async multiplexed connection from the client.
493 ///
494 /// A multiplexed connection can be cloned, allowing requests to be sent concurrently
495 /// on the same underlying connection (tcp/unix socket).
496 #[cfg(feature = "async-std-comp")]
497 #[cfg_attr(docsrs, doc(cfg(feature = "async-std-comp")))]
498 pub async fn get_multiplexed_async_std_connection(
499 &self,
500 ) -> RedisResult<crate::aio::MultiplexedConnection> {
501 self.get_multiplexed_async_connection_inner::<crate::aio::async_std::AsyncStd>(
502 &AsyncConnectionConfig::new(),
503 )
504 .await
505 }
506
507 /// Returns an async multiplexed connection from the client and a future which must be polled
508 /// to drive any requests submitted to it (see [Self::get_multiplexed_async_connection]).
509 ///
510 /// A multiplexed connection can be cloned, allowing requests to be sent concurrently
511 /// on the same underlying connection (tcp/unix socket).
512 /// The multiplexer will return a timeout error on any request that takes longer then `response_timeout`.
513 #[cfg(feature = "tokio-comp")]
514 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-comp")))]
515 pub async fn create_multiplexed_tokio_connection_with_response_timeout(
516 &self,
517 response_timeout: std::time::Duration,
518 ) -> RedisResult<(
519 crate::aio::MultiplexedConnection,
520 impl std::future::Future<Output = ()>,
521 )> {
522 self.create_multiplexed_async_connection_inner::<crate::aio::tokio::Tokio>(
523 &AsyncConnectionConfig::new().set_response_timeout(response_timeout),
524 )
525 .await
526 }
527
528 /// Returns an async multiplexed connection from the client and a future which must be polled
529 /// to drive any requests submitted to it (see [Self::get_multiplexed_async_connection]).
530 ///
531 /// A multiplexed connection can be cloned, allowing requests to be sent concurrently
532 /// on the same underlying connection (tcp/unix socket).
533 #[cfg(feature = "tokio-comp")]
534 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-comp")))]
535 pub async fn create_multiplexed_tokio_connection(
536 &self,
537 ) -> RedisResult<(
538 crate::aio::MultiplexedConnection,
539 impl std::future::Future<Output = ()>,
540 )> {
541 self.create_multiplexed_async_connection_inner::<crate::aio::tokio::Tokio>(
542 &AsyncConnectionConfig::new(),
543 )
544 .await
545 }
546
547 /// Returns an async multiplexed connection from the client and a future which must be polled
548 /// to drive any requests submitted to it (see [Self::get_multiplexed_async_connection]).
549 ///
550 /// A multiplexed connection can be cloned, allowing requests to be sent concurrently
551 /// on the same underlying connection (tcp/unix socket).
552 /// The multiplexer will return a timeout error on any request that takes longer then `response_timeout`.
553 #[cfg(feature = "async-std-comp")]
554 #[cfg_attr(docsrs, doc(cfg(feature = "async-std-comp")))]
555 pub async fn create_multiplexed_async_std_connection_with_response_timeout(
556 &self,
557 response_timeout: std::time::Duration,
558 ) -> RedisResult<(
559 crate::aio::MultiplexedConnection,
560 impl std::future::Future<Output = ()>,
561 )> {
562 self.create_multiplexed_async_connection_inner::<crate::aio::async_std::AsyncStd>(
563 &AsyncConnectionConfig::new().set_response_timeout(response_timeout),
564 )
565 .await
566 }
567
568 /// Returns an async multiplexed connection from the client and a future which must be polled
569 /// to drive any requests submitted to it (see [Self::get_multiplexed_async_connection]).
570 ///
571 /// A multiplexed connection can be cloned, allowing requests to be sent concurrently
572 /// on the same underlying connection (tcp/unix socket).
573 #[cfg(feature = "async-std-comp")]
574 #[cfg_attr(docsrs, doc(cfg(feature = "async-std-comp")))]
575 pub async fn create_multiplexed_async_std_connection(
576 &self,
577 ) -> RedisResult<(
578 crate::aio::MultiplexedConnection,
579 impl std::future::Future<Output = ()>,
580 )> {
581 self.create_multiplexed_async_connection_inner::<crate::aio::async_std::AsyncStd>(
582 &AsyncConnectionConfig::new(),
583 )
584 .await
585 }
586
587 /// Returns an async [`ConnectionManager`][connection-manager] from the client.
588 ///
589 /// The connection manager wraps a
590 /// [`MultiplexedConnection`][multiplexed-connection]. If a command to that
591 /// connection fails with a connection error, then a new connection is
592 /// established in the background and the error is returned to the caller.
593 ///
594 /// This means that on connection loss at least one command will fail, but
595 /// the connection will be re-established automatically if possible. Please
596 /// refer to the [`ConnectionManager`][connection-manager] docs for
597 /// detailed reconnecting behavior.
598 ///
599 /// A connection manager can be cloned, allowing requests to be sent concurrently
600 /// on the same underlying connection (tcp/unix socket).
601 ///
602 /// [connection-manager]: aio/struct.ConnectionManager.html
603 /// [multiplexed-connection]: aio/struct.MultiplexedConnection.html
604 #[cfg(feature = "connection-manager")]
605 #[cfg_attr(docsrs, doc(cfg(feature = "connection-manager")))]
606 #[deprecated(note = "use get_connection_manager instead")]
607 pub async fn get_tokio_connection_manager(&self) -> RedisResult<crate::aio::ConnectionManager> {
608 crate::aio::ConnectionManager::new(self.clone()).await
609 }
610
611 /// Returns an async [`ConnectionManager`][connection-manager] from the client.
612 ///
613 /// The connection manager wraps a
614 /// [`MultiplexedConnection`][multiplexed-connection]. If a command to that
615 /// connection fails with a connection error, then a new connection is
616 /// established in the background and the error is returned to the caller.
617 ///
618 /// This means that on connection loss at least one command will fail, but
619 /// the connection will be re-established automatically if possible. Please
620 /// refer to the [`ConnectionManager`][connection-manager] docs for
621 /// detailed reconnecting behavior.
622 ///
623 /// A connection manager can be cloned, allowing requests to be sent concurrently
624 /// on the same underlying connection (tcp/unix socket).
625 ///
626 /// [connection-manager]: aio/struct.ConnectionManager.html
627 /// [multiplexed-connection]: aio/struct.MultiplexedConnection.html
628 #[cfg(feature = "connection-manager")]
629 #[cfg_attr(docsrs, doc(cfg(feature = "connection-manager")))]
630 pub async fn get_connection_manager(&self) -> RedisResult<crate::aio::ConnectionManager> {
631 crate::aio::ConnectionManager::new(self.clone()).await
632 }
633
634 /// Returns an async [`ConnectionManager`][connection-manager] from the client.
635 ///
636 /// The connection manager wraps a
637 /// [`MultiplexedConnection`][multiplexed-connection]. If a command to that
638 /// connection fails with a connection error, then a new connection is
639 /// established in the background and the error is returned to the caller.
640 ///
641 /// This means that on connection loss at least one command will fail, but
642 /// the connection will be re-established automatically if possible. Please
643 /// refer to the [`ConnectionManager`][connection-manager] docs for
644 /// detailed reconnecting behavior.
645 ///
646 /// A connection manager can be cloned, allowing requests to be sent concurrently
647 /// on the same underlying connection (tcp/unix socket).
648 ///
649 /// [connection-manager]: aio/struct.ConnectionManager.html
650 /// [multiplexed-connection]: aio/struct.MultiplexedConnection.html
651 #[cfg(feature = "connection-manager")]
652 #[cfg_attr(docsrs, doc(cfg(feature = "connection-manager")))]
653 #[deprecated(note = "Use `get_connection_manager_with_config` instead")]
654 pub async fn get_tokio_connection_manager_with_backoff(
655 &self,
656 exponent_base: u64,
657 factor: u64,
658 number_of_retries: usize,
659 ) -> RedisResult<crate::aio::ConnectionManager> {
660 use crate::aio::ConnectionManagerConfig;
661
662 let config = ConnectionManagerConfig::new()
663 .set_exponent_base(exponent_base)
664 .set_factor(factor)
665 .set_number_of_retries(number_of_retries);
666 crate::aio::ConnectionManager::new_with_config(self.clone(), config).await
667 }
668
669 /// Returns an async [`ConnectionManager`][connection-manager] from the client.
670 ///
671 /// The connection manager wraps a
672 /// [`MultiplexedConnection`][multiplexed-connection]. If a command to that
673 /// connection fails with a connection error, then a new connection is
674 /// established in the background and the error is returned to the caller.
675 ///
676 /// This means that on connection loss at least one command will fail, but
677 /// the connection will be re-established automatically if possible. Please
678 /// refer to the [`ConnectionManager`][connection-manager] docs for
679 /// detailed reconnecting behavior.
680 ///
681 /// A connection manager can be cloned, allowing requests to be sent concurrently
682 /// on the same underlying connection (tcp/unix socket).
683 ///
684 /// [connection-manager]: aio/struct.ConnectionManager.html
685 /// [multiplexed-connection]: aio/struct.MultiplexedConnection.html
686 #[cfg(feature = "connection-manager")]
687 #[cfg_attr(docsrs, doc(cfg(feature = "connection-manager")))]
688 #[deprecated(note = "Use `get_connection_manager_with_config` instead")]
689 pub async fn get_tokio_connection_manager_with_backoff_and_timeouts(
690 &self,
691 exponent_base: u64,
692 factor: u64,
693 number_of_retries: usize,
694 response_timeout: std::time::Duration,
695 connection_timeout: std::time::Duration,
696 ) -> RedisResult<crate::aio::ConnectionManager> {
697 use crate::aio::ConnectionManagerConfig;
698
699 let config = ConnectionManagerConfig::new()
700 .set_exponent_base(exponent_base)
701 .set_factor(factor)
702 .set_response_timeout(response_timeout)
703 .set_connection_timeout(connection_timeout)
704 .set_number_of_retries(number_of_retries);
705 crate::aio::ConnectionManager::new_with_config(self.clone(), config).await
706 }
707
708 /// Returns an async [`ConnectionManager`][connection-manager] from the client.
709 ///
710 /// The connection manager wraps a
711 /// [`MultiplexedConnection`][multiplexed-connection]. If a command to that
712 /// connection fails with a connection error, then a new connection is
713 /// established in the background and the error is returned to the caller.
714 ///
715 /// This means that on connection loss at least one command will fail, but
716 /// the connection will be re-established automatically if possible. Please
717 /// refer to the [`ConnectionManager`][connection-manager] docs for
718 /// detailed reconnecting behavior.
719 ///
720 /// A connection manager can be cloned, allowing requests to be sent concurrently
721 /// on the same underlying connection (tcp/unix socket).
722 ///
723 /// [connection-manager]: aio/struct.ConnectionManager.html
724 /// [multiplexed-connection]: aio/struct.MultiplexedConnection.html
725 #[cfg(feature = "connection-manager")]
726 #[cfg_attr(docsrs, doc(cfg(feature = "connection-manager")))]
727 #[deprecated(note = "Use `get_connection_manager_with_config` instead")]
728 pub async fn get_connection_manager_with_backoff_and_timeouts(
729 &self,
730 exponent_base: u64,
731 factor: u64,
732 number_of_retries: usize,
733 response_timeout: std::time::Duration,
734 connection_timeout: std::time::Duration,
735 ) -> RedisResult<crate::aio::ConnectionManager> {
736 use crate::aio::ConnectionManagerConfig;
737
738 let config = ConnectionManagerConfig::new()
739 .set_exponent_base(exponent_base)
740 .set_factor(factor)
741 .set_response_timeout(response_timeout)
742 .set_connection_timeout(connection_timeout)
743 .set_number_of_retries(number_of_retries);
744 crate::aio::ConnectionManager::new_with_config(self.clone(), config).await
745 }
746
747 /// Returns an async [`ConnectionManager`][connection-manager] from the client.
748 ///
749 /// The connection manager wraps a
750 /// [`MultiplexedConnection`][multiplexed-connection]. If a command to that
751 /// connection fails with a connection error, then a new connection is
752 /// established in the background and the error is returned to the caller.
753 ///
754 /// This means that on connection loss at least one command will fail, but
755 /// the connection will be re-established automatically if possible. Please
756 /// refer to the [`ConnectionManager`][connection-manager] docs for
757 /// detailed reconnecting behavior.
758 ///
759 /// A connection manager can be cloned, allowing requests to be sent concurrently
760 /// on the same underlying connection (tcp/unix socket).
761 ///
762 /// [connection-manager]: aio/struct.ConnectionManager.html
763 /// [multiplexed-connection]: aio/struct.MultiplexedConnection.html
764 #[cfg(feature = "connection-manager")]
765 #[cfg_attr(docsrs, doc(cfg(feature = "connection-manager")))]
766 pub async fn get_connection_manager_with_config(
767 &self,
768 config: crate::aio::ConnectionManagerConfig,
769 ) -> RedisResult<crate::aio::ConnectionManager> {
770 crate::aio::ConnectionManager::new_with_config(self.clone(), config).await
771 }
772
773 /// Returns an async [`ConnectionManager`][connection-manager] from the client.
774 ///
775 /// The connection manager wraps a
776 /// [`MultiplexedConnection`][multiplexed-connection]. If a command to that
777 /// connection fails with a connection error, then a new connection is
778 /// established in the background and the error is returned to the caller.
779 ///
780 /// This means that on connection loss at least one command will fail, but
781 /// the connection will be re-established automatically if possible. Please
782 /// refer to the [`ConnectionManager`][connection-manager] docs for
783 /// detailed reconnecting behavior.
784 ///
785 /// A connection manager can be cloned, allowing requests to be be sent concurrently
786 /// on the same underlying connection (tcp/unix socket).
787 ///
788 /// [connection-manager]: aio/struct.ConnectionManager.html
789 /// [multiplexed-connection]: aio/struct.MultiplexedConnection.html
790 #[cfg(feature = "connection-manager")]
791 #[cfg_attr(docsrs, doc(cfg(feature = "connection-manager")))]
792 #[deprecated(note = "Use `get_connection_manager_with_config` instead")]
793 pub async fn get_connection_manager_with_backoff(
794 &self,
795 exponent_base: u64,
796 factor: u64,
797 number_of_retries: usize,
798 ) -> RedisResult<crate::aio::ConnectionManager> {
799 use crate::aio::ConnectionManagerConfig;
800
801 let config = ConnectionManagerConfig::new()
802 .set_exponent_base(exponent_base)
803 .set_factor(factor)
804 .set_number_of_retries(number_of_retries);
805 crate::aio::ConnectionManager::new_with_config(self.clone(), config).await
806 }
807
808 async fn get_multiplexed_async_connection_inner<T>(
809 &self,
810 config: &AsyncConnectionConfig,
811 ) -> RedisResult<crate::aio::MultiplexedConnection>
812 where
813 T: crate::aio::RedisRuntime,
814 {
815 let (mut connection, driver) = self
816 .create_multiplexed_async_connection_inner::<T>(config)
817 .await?;
818 let handle = T::spawn(driver);
819 connection.set_task_handle(handle);
820 Ok(connection)
821 }
822
823 async fn create_multiplexed_async_connection_inner<T>(
824 &self,
825 config: &AsyncConnectionConfig,
826 ) -> RedisResult<(
827 crate::aio::MultiplexedConnection,
828 impl std::future::Future<Output = ()>,
829 )>
830 where
831 T: crate::aio::RedisRuntime,
832 {
833 let resolver = config
834 .dns_resolver
835 .as_deref()
836 .unwrap_or(&DefaultAsyncDNSResolver);
837 let con = self
838 .get_simple_async_connection::<T>(resolver, &config.tcp_settings)
839 .await?;
840 crate::aio::MultiplexedConnection::new_with_config(
841 &self.connection_info.redis,
842 con,
843 config.clone(),
844 )
845 .await
846 }
847
848 async fn get_simple_async_connection_dynamically(
849 &self,
850 dns_resolver: &dyn AsyncDNSResolver,
851 tcp_settings: &TcpSettings,
852 ) -> RedisResult<Pin<Box<dyn crate::aio::AsyncStream + Send + Sync>>> {
853 match Runtime::locate() {
854 #[cfg(feature = "tokio-comp")]
855 Runtime::Tokio => {
856 self.get_simple_async_connection::<crate::aio::tokio::Tokio>(
857 dns_resolver,
858 tcp_settings,
859 )
860 .await
861 }
862
863 #[cfg(feature = "async-std-comp")]
864 Runtime::AsyncStd => {
865 self.get_simple_async_connection::<crate::aio::async_std::AsyncStd>(
866 dns_resolver,
867 tcp_settings,
868 )
869 .await
870 }
871 }
872 }
873
874 async fn get_simple_async_connection<T>(
875 &self,
876 dns_resolver: &dyn AsyncDNSResolver,
877 tcp_settings: &TcpSettings,
878 ) -> RedisResult<Pin<Box<dyn crate::aio::AsyncStream + Send + Sync>>>
879 where
880 T: crate::aio::RedisRuntime,
881 {
882 Ok(
883 crate::aio::connect_simple::<T>(&self.connection_info, dns_resolver, tcp_settings)
884 .await?
885 .boxed(),
886 )
887 }
888
889 #[cfg(feature = "connection-manager")]
890 pub(crate) fn connection_info(&self) -> &ConnectionInfo {
891 &self.connection_info
892 }
893
894 /// Returns an async receiver for pub-sub messages.
895 #[cfg(any(feature = "tokio-comp", feature = "async-std-comp"))]
896 // TODO - do we want to type-erase pubsub using a trait, to allow us to replace it with a different implementation later?
897 pub async fn get_async_pubsub(&self) -> RedisResult<crate::aio::PubSub> {
898 let connection = self
899 .get_simple_async_connection_dynamically(
900 &DefaultAsyncDNSResolver,
901 &TcpSettings::default(),
902 )
903 .await?;
904
905 crate::aio::PubSub::new(&self.connection_info.redis, connection).await
906 }
907
908 /// Returns an async receiver for monitor messages.
909 #[cfg(any(feature = "tokio-comp", feature = "async-std-comp"))]
910 // TODO - do we want to type-erase monitor using a trait, to allow us to replace it with a different implementation later?
911 pub async fn get_async_monitor(&self) -> RedisResult<crate::aio::Monitor> {
912 #[allow(deprecated)]
913 self.get_async_connection()
914 .await
915 .map(|connection| connection.into_monitor())
916 }
917}
918
919#[cfg(feature = "aio")]
920use crate::aio::Runtime;
921
922impl ConnectionLike for Client {
923 fn req_packed_command(&mut self, cmd: &[u8]) -> RedisResult<Value> {
924 self.get_connection()?.req_packed_command(cmd)
925 }
926
927 fn req_packed_commands(
928 &mut self,
929 cmd: &[u8],
930 offset: usize,
931 count: usize,
932 ) -> RedisResult<Vec<Value>> {
933 self.get_connection()?
934 .req_packed_commands(cmd, offset, count)
935 }
936
937 fn get_db(&self) -> i64 {
938 self.connection_info.redis.db
939 }
940
941 fn check_connection(&mut self) -> bool {
942 if let Ok(mut conn) = self.get_connection() {
943 conn.check_connection()
944 } else {
945 false
946 }
947 }
948
949 fn is_open(&self) -> bool {
950 if let Ok(conn) = self.get_connection() {
951 conn.is_open()
952 } else {
953 false
954 }
955 }
956}
957
958#[cfg(test)]
959mod test {
960 use super::*;
961
962 #[test]
963 fn regression_293_parse_ipv6_with_interface() {
964 assert!(Client::open(("fe80::cafe:beef%eno1", 6379)).is_ok());
965 }
966}