redis/
connection.rs

1use std::borrow::Cow;
2use std::collections::VecDeque;
3use std::fmt;
4use std::io::{self, Write};
5use std::net::{self, SocketAddr, TcpStream, ToSocketAddrs};
6use std::ops::DerefMut;
7use std::path::PathBuf;
8use std::str::{from_utf8, FromStr};
9use std::time::{Duration, Instant};
10
11use crate::cmd::{cmd, pipe, Cmd};
12use crate::io::tcp::{stream_with_settings, TcpSettings};
13use crate::parser::Parser;
14use crate::pipeline::Pipeline;
15use crate::types::{
16    from_redis_value, ErrorKind, FromRedisValue, HashMap, PushKind, RedisError, RedisResult,
17    ServerError, ServerErrorKind, SyncPushSender, ToRedisArgs, Value,
18};
19use crate::{from_owned_redis_value, ProtocolVersion};
20
21#[cfg(unix)]
22use std::os::unix::net::UnixStream;
23
24use crate::commands::resp3_hello;
25#[cfg(all(feature = "tls-native-tls", not(feature = "tls-rustls")))]
26use native_tls::{TlsConnector, TlsStream};
27
28#[cfg(feature = "tls-rustls")]
29use rustls::{RootCertStore, StreamOwned};
30#[cfg(feature = "tls-rustls")]
31use std::sync::Arc;
32
33use crate::PushInfo;
34
35#[cfg(all(
36    feature = "tls-rustls",
37    not(feature = "tls-native-tls"),
38    not(feature = "tls-rustls-webpki-roots")
39))]
40use rustls_native_certs::load_native_certs;
41
42#[cfg(feature = "tls-rustls")]
43use crate::tls::ClientTlsParams;
44
45// Non-exhaustive to prevent construction outside this crate
46#[derive(Clone, Debug)]
47#[non_exhaustive]
48pub struct TlsConnParams {
49    #[cfg(feature = "tls-rustls")]
50    pub(crate) client_tls_params: Option<ClientTlsParams>,
51    #[cfg(feature = "tls-rustls")]
52    pub(crate) root_cert_store: Option<RootCertStore>,
53    #[cfg(any(feature = "tls-rustls-insecure", feature = "tls-native-tls"))]
54    pub(crate) danger_accept_invalid_hostnames: bool,
55}
56
57static DEFAULT_PORT: u16 = 6379;
58
59#[inline(always)]
60fn connect_tcp(addr: (&str, u16)) -> io::Result<TcpStream> {
61    let socket = TcpStream::connect(addr)?;
62    stream_with_settings(socket, &TcpSettings::default())
63}
64
65#[inline(always)]
66fn connect_tcp_timeout(addr: &SocketAddr, timeout: Duration) -> io::Result<TcpStream> {
67    let socket = TcpStream::connect_timeout(addr, timeout)?;
68    stream_with_settings(socket, &TcpSettings::default())
69}
70
71/// This function takes a redis URL string and parses it into a URL
72/// as used by rust-url.
73///
74/// This is necessary as the default parser does not understand how redis URLs function.
75pub fn parse_redis_url(input: &str) -> Option<url::Url> {
76    match url::Url::parse(input) {
77        Ok(result) => match result.scheme() {
78            "redis" | "rediss" | "valkey" | "valkeys" | "redis+unix" | "valkey+unix" | "unix" => {
79                Some(result)
80            }
81            _ => None,
82        },
83        Err(_) => None,
84    }
85}
86
87/// TlsMode indicates use or do not use verification of certification.
88///
89/// Check [ConnectionAddr](ConnectionAddr::TcpTls::insecure) for more.
90#[derive(Clone, Copy, PartialEq)]
91pub enum TlsMode {
92    /// Secure verify certification.
93    Secure,
94    /// Insecure do not verify certification.
95    Insecure,
96}
97
98/// Defines the connection address.
99///
100/// Not all connection addresses are supported on all platforms.  For instance
101/// to connect to a unix socket you need to run this on an operating system
102/// that supports them.
103#[derive(Clone, Debug)]
104pub enum ConnectionAddr {
105    /// Format for this is `(host, port)`.
106    Tcp(String, u16),
107    /// Format for this is `(host, port)`.
108    TcpTls {
109        /// Hostname
110        host: String,
111        /// Port
112        port: u16,
113        /// Disable hostname verification when connecting.
114        ///
115        /// # Warning
116        ///
117        /// You should think very carefully before you use this method. If hostname
118        /// verification is not used, any valid certificate for any site will be
119        /// trusted for use from any other. This introduces a significant
120        /// vulnerability to man-in-the-middle attacks.
121        insecure: bool,
122
123        /// TLS certificates and client key.
124        tls_params: Option<TlsConnParams>,
125    },
126    /// Format for this is the path to the unix socket.
127    Unix(PathBuf),
128}
129
130impl PartialEq for ConnectionAddr {
131    fn eq(&self, other: &Self) -> bool {
132        match (self, other) {
133            (ConnectionAddr::Tcp(host1, port1), ConnectionAddr::Tcp(host2, port2)) => {
134                host1 == host2 && port1 == port2
135            }
136            (
137                ConnectionAddr::TcpTls {
138                    host: host1,
139                    port: port1,
140                    insecure: insecure1,
141                    tls_params: _,
142                },
143                ConnectionAddr::TcpTls {
144                    host: host2,
145                    port: port2,
146                    insecure: insecure2,
147                    tls_params: _,
148                },
149            ) => port1 == port2 && host1 == host2 && insecure1 == insecure2,
150            (ConnectionAddr::Unix(path1), ConnectionAddr::Unix(path2)) => path1 == path2,
151            _ => false,
152        }
153    }
154}
155
156impl Eq for ConnectionAddr {}
157
158impl ConnectionAddr {
159    /// Checks if this address is supported.
160    ///
161    /// Because not all platforms support all connection addresses this is a
162    /// quick way to figure out if a connection method is supported. Currently
163    /// this affects:
164    ///
165    /// - Unix socket addresses, which are supported only on Unix
166    ///
167    /// - TLS addresses, which are supported only if a TLS feature is enabled
168    ///   (either `tls-native-tls` or `tls-rustls`).
169    pub fn is_supported(&self) -> bool {
170        match *self {
171            ConnectionAddr::Tcp(_, _) => true,
172            ConnectionAddr::TcpTls { .. } => {
173                cfg!(any(feature = "tls-native-tls", feature = "tls-rustls"))
174            }
175            ConnectionAddr::Unix(_) => cfg!(unix),
176        }
177    }
178
179    /// Configure this address to connect without checking certificate hostnames.
180    ///
181    /// # Warning
182    ///
183    /// You should think very carefully before you use this method. If hostname
184    /// verification is not used, any valid certificate for any site will be
185    /// trusted for use from any other. This introduces a significant
186    /// vulnerability to man-in-the-middle attacks.
187    #[cfg(any(feature = "tls-rustls-insecure", feature = "tls-native-tls"))]
188    pub fn set_danger_accept_invalid_hostnames(&mut self, insecure: bool) {
189        if let ConnectionAddr::TcpTls { tls_params, .. } = self {
190            if let Some(ref mut params) = tls_params {
191                params.danger_accept_invalid_hostnames = insecure;
192            } else if insecure {
193                *tls_params = Some(TlsConnParams {
194                    #[cfg(feature = "tls-rustls")]
195                    client_tls_params: None,
196                    #[cfg(feature = "tls-rustls")]
197                    root_cert_store: None,
198                    danger_accept_invalid_hostnames: insecure,
199                });
200            }
201        }
202    }
203
204    #[cfg(feature = "cluster")]
205    pub(crate) fn tls_mode(&self) -> Option<TlsMode> {
206        match self {
207            ConnectionAddr::TcpTls { insecure, .. } => {
208                if *insecure {
209                    Some(TlsMode::Insecure)
210                } else {
211                    Some(TlsMode::Secure)
212                }
213            }
214            _ => None,
215        }
216    }
217}
218
219impl fmt::Display for ConnectionAddr {
220    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
221        // Cluster::get_connection_info depends on the return value from this function
222        match *self {
223            ConnectionAddr::Tcp(ref host, port) => write!(f, "{host}:{port}"),
224            ConnectionAddr::TcpTls { ref host, port, .. } => write!(f, "{host}:{port}"),
225            ConnectionAddr::Unix(ref path) => write!(f, "{}", path.display()),
226        }
227    }
228}
229
230/// Holds the connection information that redis should use for connecting.
231#[derive(Clone, Debug)]
232pub struct ConnectionInfo {
233    /// A connection address for where to connect to.
234    pub addr: ConnectionAddr,
235
236    /// A redis connection info for how to handshake with redis.
237    pub redis: RedisConnectionInfo,
238}
239
240/// Redis specific/connection independent information used to establish a connection to redis.
241#[derive(Clone, Debug, Default)]
242pub struct RedisConnectionInfo {
243    /// The database number to use.  This is usually `0`.
244    pub db: i64,
245    /// Optionally a username that should be used for connection.
246    pub username: Option<String>,
247    /// Optionally a password that should be used for connection.
248    pub password: Option<String>,
249    /// Version of the protocol to use.
250    pub protocol: ProtocolVersion,
251}
252
253impl FromStr for ConnectionInfo {
254    type Err = RedisError;
255
256    fn from_str(s: &str) -> Result<Self, Self::Err> {
257        s.into_connection_info()
258    }
259}
260
261/// Converts an object into a connection info struct.  This allows the
262/// constructor of the client to accept connection information in a
263/// range of different formats.
264pub trait IntoConnectionInfo {
265    /// Converts the object into a connection info object.
266    fn into_connection_info(self) -> RedisResult<ConnectionInfo>;
267}
268
269impl IntoConnectionInfo for ConnectionInfo {
270    fn into_connection_info(self) -> RedisResult<ConnectionInfo> {
271        Ok(self)
272    }
273}
274
275/// URL format: `{redis|rediss|valkey|valkeys}://[<username>][:<password>@]<hostname>[:port][/<db>]`
276///
277/// - Basic: `redis://127.0.0.1:6379`
278/// - Username & Password: `redis://user:password@127.0.0.1:6379`
279/// - Password only: `redis://:password@127.0.0.1:6379`
280/// - Specifying DB: `redis://127.0.0.1:6379/0`
281/// - Enabling TLS: `rediss://127.0.0.1:6379`
282/// - Enabling Insecure TLS: `rediss://127.0.0.1:6379/#insecure`
283/// - Enabling RESP3: `redis://127.0.0.1:6379/?protocol=resp3`
284impl IntoConnectionInfo for &str {
285    fn into_connection_info(self) -> RedisResult<ConnectionInfo> {
286        match parse_redis_url(self) {
287            Some(u) => u.into_connection_info(),
288            None => fail!((ErrorKind::InvalidClientConfig, "Redis URL did not parse")),
289        }
290    }
291}
292
293impl<T> IntoConnectionInfo for (T, u16)
294where
295    T: Into<String>,
296{
297    fn into_connection_info(self) -> RedisResult<ConnectionInfo> {
298        Ok(ConnectionInfo {
299            addr: ConnectionAddr::Tcp(self.0.into(), self.1),
300            redis: RedisConnectionInfo::default(),
301        })
302    }
303}
304
305/// URL format: `{redis|rediss|valkey|valkeys}://[<username>][:<password>@]<hostname>[:port][/<db>]`
306///
307/// - Basic: `redis://127.0.0.1:6379`
308/// - Username & Password: `redis://user:password@127.0.0.1:6379`
309/// - Password only: `redis://:password@127.0.0.1:6379`
310/// - Specifying DB: `redis://127.0.0.1:6379/0`
311/// - Enabling TLS: `rediss://127.0.0.1:6379`
312/// - Enabling Insecure TLS: `rediss://127.0.0.1:6379/#insecure`
313/// - Enabling RESP3: `redis://127.0.0.1:6379/?protocol=resp3`
314impl IntoConnectionInfo for String {
315    fn into_connection_info(self) -> RedisResult<ConnectionInfo> {
316        match parse_redis_url(&self) {
317            Some(u) => u.into_connection_info(),
318            None => fail!((ErrorKind::InvalidClientConfig, "Redis URL did not parse")),
319        }
320    }
321}
322
323fn parse_protocol(query: &HashMap<Cow<str>, Cow<str>>) -> RedisResult<ProtocolVersion> {
324    Ok(match query.get("protocol") {
325        Some(protocol) => {
326            if protocol == "2" || protocol == "resp2" {
327                ProtocolVersion::RESP2
328            } else if protocol == "3" || protocol == "resp3" {
329                ProtocolVersion::RESP3
330            } else {
331                fail!((
332                    ErrorKind::InvalidClientConfig,
333                    "Invalid protocol version",
334                    protocol.to_string()
335                ))
336            }
337        }
338        None => ProtocolVersion::RESP2,
339    })
340}
341
342fn url_to_tcp_connection_info(url: url::Url) -> RedisResult<ConnectionInfo> {
343    let host = match url.host() {
344        Some(host) => {
345            // Here we manually match host's enum arms and call their to_string().
346            // Because url.host().to_string() will add `[` and `]` for ipv6:
347            // https://docs.rs/url/latest/src/url/host.rs.html#170
348            // And these brackets will break host.parse::<Ipv6Addr>() when
349            // `client.open()` - `ActualConnection::new()` - `addr.to_socket_addrs()`:
350            // https://doc.rust-lang.org/src/std/net/addr.rs.html#963
351            // https://doc.rust-lang.org/src/std/net/parser.rs.html#158
352            // IpAddr string with brackets can ONLY parse to SocketAddrV6:
353            // https://doc.rust-lang.org/src/std/net/parser.rs.html#255
354            // But if we call Ipv6Addr.to_string directly, it follows rfc5952 without brackets:
355            // https://doc.rust-lang.org/src/std/net/ip.rs.html#1755
356            match host {
357                url::Host::Domain(path) => path.to_string(),
358                url::Host::Ipv4(v4) => v4.to_string(),
359                url::Host::Ipv6(v6) => v6.to_string(),
360            }
361        }
362        None => fail!((ErrorKind::InvalidClientConfig, "Missing hostname")),
363    };
364    let port = url.port().unwrap_or(DEFAULT_PORT);
365    let addr = if url.scheme() == "rediss" || url.scheme() == "valkeys" {
366        #[cfg(any(feature = "tls-native-tls", feature = "tls-rustls"))]
367        {
368            match url.fragment() {
369                Some("insecure") => ConnectionAddr::TcpTls {
370                    host,
371                    port,
372                    insecure: true,
373                    tls_params: None,
374                },
375                Some(_) => fail!((
376                    ErrorKind::InvalidClientConfig,
377                    "only #insecure is supported as URL fragment"
378                )),
379                _ => ConnectionAddr::TcpTls {
380                    host,
381                    port,
382                    insecure: false,
383                    tls_params: None,
384                },
385            }
386        }
387
388        #[cfg(not(any(feature = "tls-native-tls", feature = "tls-rustls")))]
389        fail!((
390            ErrorKind::InvalidClientConfig,
391            "can't connect with TLS, the feature is not enabled"
392        ));
393    } else {
394        ConnectionAddr::Tcp(host, port)
395    };
396    let query: HashMap<_, _> = url.query_pairs().collect();
397    Ok(ConnectionInfo {
398        addr,
399        redis: RedisConnectionInfo {
400            db: match url.path().trim_matches('/') {
401                "" => 0,
402                path => path.parse::<i64>().map_err(|_| -> RedisError {
403                    (ErrorKind::InvalidClientConfig, "Invalid database number").into()
404                })?,
405            },
406            username: if url.username().is_empty() {
407                None
408            } else {
409                match percent_encoding::percent_decode(url.username().as_bytes()).decode_utf8() {
410                    Ok(decoded) => Some(decoded.into_owned()),
411                    Err(_) => fail!((
412                        ErrorKind::InvalidClientConfig,
413                        "Username is not valid UTF-8 string"
414                    )),
415                }
416            },
417            password: match url.password() {
418                Some(pw) => match percent_encoding::percent_decode(pw.as_bytes()).decode_utf8() {
419                    Ok(decoded) => Some(decoded.into_owned()),
420                    Err(_) => fail!((
421                        ErrorKind::InvalidClientConfig,
422                        "Password is not valid UTF-8 string"
423                    )),
424                },
425                None => None,
426            },
427            protocol: parse_protocol(&query)?,
428        },
429    })
430}
431
432#[cfg(unix)]
433fn url_to_unix_connection_info(url: url::Url) -> RedisResult<ConnectionInfo> {
434    let query: HashMap<_, _> = url.query_pairs().collect();
435    Ok(ConnectionInfo {
436        addr: ConnectionAddr::Unix(url.to_file_path().map_err(|_| -> RedisError {
437            (ErrorKind::InvalidClientConfig, "Missing path").into()
438        })?),
439        redis: RedisConnectionInfo {
440            db: match query.get("db") {
441                Some(db) => db.parse::<i64>().map_err(|_| -> RedisError {
442                    (ErrorKind::InvalidClientConfig, "Invalid database number").into()
443                })?,
444
445                None => 0,
446            },
447            username: query.get("user").map(|username| username.to_string()),
448            password: query.get("pass").map(|password| password.to_string()),
449            protocol: parse_protocol(&query)?,
450        },
451    })
452}
453
454#[cfg(not(unix))]
455fn url_to_unix_connection_info(_: url::Url) -> RedisResult<ConnectionInfo> {
456    fail!((
457        ErrorKind::InvalidClientConfig,
458        "Unix sockets are not available on this platform."
459    ));
460}
461
462impl IntoConnectionInfo for url::Url {
463    fn into_connection_info(self) -> RedisResult<ConnectionInfo> {
464        match self.scheme() {
465            "redis" | "rediss" | "valkey" | "valkeys" => url_to_tcp_connection_info(self),
466            "unix" | "redis+unix" | "valkey+unix" => url_to_unix_connection_info(self),
467            _ => fail!((
468                ErrorKind::InvalidClientConfig,
469                "URL provided is not a redis URL"
470            )),
471        }
472    }
473}
474
475struct TcpConnection {
476    reader: TcpStream,
477    open: bool,
478}
479
480#[cfg(all(feature = "tls-native-tls", not(feature = "tls-rustls")))]
481struct TcpNativeTlsConnection {
482    reader: TlsStream<TcpStream>,
483    open: bool,
484}
485
486#[cfg(feature = "tls-rustls")]
487struct TcpRustlsConnection {
488    reader: StreamOwned<rustls::ClientConnection, TcpStream>,
489    open: bool,
490}
491
492#[cfg(unix)]
493struct UnixConnection {
494    sock: UnixStream,
495    open: bool,
496}
497
498enum ActualConnection {
499    Tcp(TcpConnection),
500    #[cfg(all(feature = "tls-native-tls", not(feature = "tls-rustls")))]
501    TcpNativeTls(Box<TcpNativeTlsConnection>),
502    #[cfg(feature = "tls-rustls")]
503    TcpRustls(Box<TcpRustlsConnection>),
504    #[cfg(unix)]
505    Unix(UnixConnection),
506}
507
508#[cfg(feature = "tls-rustls-insecure")]
509struct NoCertificateVerification {
510    supported: rustls::crypto::WebPkiSupportedAlgorithms,
511}
512
513#[cfg(feature = "tls-rustls-insecure")]
514impl rustls::client::danger::ServerCertVerifier for NoCertificateVerification {
515    fn verify_server_cert(
516        &self,
517        _end_entity: &rustls::pki_types::CertificateDer<'_>,
518        _intermediates: &[rustls::pki_types::CertificateDer<'_>],
519        _server_name: &rustls::pki_types::ServerName<'_>,
520        _ocsp_response: &[u8],
521        _now: rustls::pki_types::UnixTime,
522    ) -> Result<rustls::client::danger::ServerCertVerified, rustls::Error> {
523        Ok(rustls::client::danger::ServerCertVerified::assertion())
524    }
525
526    fn verify_tls12_signature(
527        &self,
528        _message: &[u8],
529        _cert: &rustls::pki_types::CertificateDer<'_>,
530        _dss: &rustls::DigitallySignedStruct,
531    ) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
532        Ok(rustls::client::danger::HandshakeSignatureValid::assertion())
533    }
534
535    fn verify_tls13_signature(
536        &self,
537        _message: &[u8],
538        _cert: &rustls::pki_types::CertificateDer<'_>,
539        _dss: &rustls::DigitallySignedStruct,
540    ) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
541        Ok(rustls::client::danger::HandshakeSignatureValid::assertion())
542    }
543
544    fn supported_verify_schemes(&self) -> Vec<rustls::SignatureScheme> {
545        self.supported.supported_schemes()
546    }
547}
548
549#[cfg(feature = "tls-rustls-insecure")]
550impl fmt::Debug for NoCertificateVerification {
551    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
552        f.debug_struct("NoCertificateVerification").finish()
553    }
554}
555
556/// Insecure `ServerCertVerifier` for rustls that implements `danger_accept_invalid_hostnames`.
557#[cfg(feature = "tls-rustls-insecure")]
558#[derive(Debug)]
559struct AcceptInvalidHostnamesCertVerifier {
560    inner: Arc<rustls::client::WebPkiServerVerifier>,
561}
562
563#[cfg(feature = "tls-rustls-insecure")]
564fn is_hostname_error(err: &rustls::Error) -> bool {
565    matches!(
566        err,
567        rustls::Error::InvalidCertificate(
568            rustls::CertificateError::NotValidForName
569                | rustls::CertificateError::NotValidForNameContext { .. }
570        )
571    )
572}
573
574#[cfg(feature = "tls-rustls-insecure")]
575impl rustls::client::danger::ServerCertVerifier for AcceptInvalidHostnamesCertVerifier {
576    fn verify_server_cert(
577        &self,
578        end_entity: &rustls::pki_types::CertificateDer<'_>,
579        intermediates: &[rustls::pki_types::CertificateDer<'_>],
580        server_name: &rustls::pki_types::ServerName<'_>,
581        ocsp_response: &[u8],
582        now: rustls::pki_types::UnixTime,
583    ) -> Result<rustls::client::danger::ServerCertVerified, rustls::Error> {
584        self.inner
585            .verify_server_cert(end_entity, intermediates, server_name, ocsp_response, now)
586            .or_else(|err| {
587                if is_hostname_error(&err) {
588                    Ok(rustls::client::danger::ServerCertVerified::assertion())
589                } else {
590                    Err(err)
591                }
592            })
593    }
594
595    fn verify_tls12_signature(
596        &self,
597        message: &[u8],
598        cert: &rustls::pki_types::CertificateDer<'_>,
599        dss: &rustls::DigitallySignedStruct,
600    ) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
601        self.inner
602            .verify_tls12_signature(message, cert, dss)
603            .or_else(|err| {
604                if is_hostname_error(&err) {
605                    Ok(rustls::client::danger::HandshakeSignatureValid::assertion())
606                } else {
607                    Err(err)
608                }
609            })
610    }
611
612    fn verify_tls13_signature(
613        &self,
614        message: &[u8],
615        cert: &rustls::pki_types::CertificateDer<'_>,
616        dss: &rustls::DigitallySignedStruct,
617    ) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
618        self.inner
619            .verify_tls13_signature(message, cert, dss)
620            .or_else(|err| {
621                if is_hostname_error(&err) {
622                    Ok(rustls::client::danger::HandshakeSignatureValid::assertion())
623                } else {
624                    Err(err)
625                }
626            })
627    }
628
629    fn supported_verify_schemes(&self) -> Vec<rustls::SignatureScheme> {
630        self.inner.supported_verify_schemes()
631    }
632}
633
634/// Represents a stateful redis TCP connection.
635pub struct Connection {
636    con: ActualConnection,
637    parser: Parser,
638    db: i64,
639
640    /// Flag indicating whether the connection was left in the PubSub state after dropping `PubSub`.
641    ///
642    /// This flag is checked when attempting to send a command, and if it's raised, we attempt to
643    /// exit the pubsub state before executing the new request.
644    pubsub: bool,
645
646    // Field indicating which protocol to use for server communications.
647    protocol: ProtocolVersion,
648
649    /// This is used to manage Push messages in RESP3 mode.
650    push_sender: Option<SyncPushSender>,
651
652    /// The number of messages that are expected to be returned from the server,
653    /// but the user no longer waits for - answers for requests that already returned a transient error.
654    messages_to_skip: usize,
655}
656
657/// Represents a pubsub connection.
658pub struct PubSub<'a> {
659    con: &'a mut Connection,
660    waiting_messages: VecDeque<Msg>,
661}
662
663/// Represents a pubsub message.
664#[derive(Debug, Clone)]
665pub struct Msg {
666    payload: Value,
667    channel: Value,
668    pattern: Option<Value>,
669}
670
671impl ActualConnection {
672    pub fn new(addr: &ConnectionAddr, timeout: Option<Duration>) -> RedisResult<ActualConnection> {
673        Ok(match *addr {
674            ConnectionAddr::Tcp(ref host, ref port) => {
675                let addr = (host.as_str(), *port);
676                let tcp = match timeout {
677                    None => connect_tcp(addr)?,
678                    Some(timeout) => {
679                        let mut tcp = None;
680                        let mut last_error = None;
681                        for addr in addr.to_socket_addrs()? {
682                            match connect_tcp_timeout(&addr, timeout) {
683                                Ok(l) => {
684                                    tcp = Some(l);
685                                    break;
686                                }
687                                Err(e) => {
688                                    last_error = Some(e);
689                                }
690                            };
691                        }
692                        match (tcp, last_error) {
693                            (Some(tcp), _) => tcp,
694                            (None, Some(e)) => {
695                                fail!(e);
696                            }
697                            (None, None) => {
698                                fail!((
699                                    ErrorKind::InvalidClientConfig,
700                                    "could not resolve to any addresses"
701                                ));
702                            }
703                        }
704                    }
705                };
706                ActualConnection::Tcp(TcpConnection {
707                    reader: tcp,
708                    open: true,
709                })
710            }
711            #[cfg(all(feature = "tls-native-tls", not(feature = "tls-rustls")))]
712            ConnectionAddr::TcpTls {
713                ref host,
714                port,
715                insecure,
716                ref tls_params,
717            } => {
718                let tls_connector = if insecure {
719                    TlsConnector::builder()
720                        .danger_accept_invalid_certs(true)
721                        .danger_accept_invalid_hostnames(true)
722                        .use_sni(false)
723                        .build()?
724                } else if let Some(params) = tls_params {
725                    TlsConnector::builder()
726                        .danger_accept_invalid_hostnames(params.danger_accept_invalid_hostnames)
727                        .build()?
728                } else {
729                    TlsConnector::new()?
730                };
731                let addr = (host.as_str(), port);
732                let tls = match timeout {
733                    None => {
734                        let tcp = connect_tcp(addr)?;
735                        match tls_connector.connect(host, tcp) {
736                            Ok(res) => res,
737                            Err(e) => {
738                                fail!((ErrorKind::IoError, "SSL Handshake error", e.to_string()));
739                            }
740                        }
741                    }
742                    Some(timeout) => {
743                        let mut tcp = None;
744                        let mut last_error = None;
745                        for addr in (host.as_str(), port).to_socket_addrs()? {
746                            match connect_tcp_timeout(&addr, timeout) {
747                                Ok(l) => {
748                                    tcp = Some(l);
749                                    break;
750                                }
751                                Err(e) => {
752                                    last_error = Some(e);
753                                }
754                            };
755                        }
756                        match (tcp, last_error) {
757                            (Some(tcp), _) => tls_connector.connect(host, tcp).unwrap(),
758                            (None, Some(e)) => {
759                                fail!(e);
760                            }
761                            (None, None) => {
762                                fail!((
763                                    ErrorKind::InvalidClientConfig,
764                                    "could not resolve to any addresses"
765                                ));
766                            }
767                        }
768                    }
769                };
770                ActualConnection::TcpNativeTls(Box::new(TcpNativeTlsConnection {
771                    reader: tls,
772                    open: true,
773                }))
774            }
775            #[cfg(feature = "tls-rustls")]
776            ConnectionAddr::TcpTls {
777                ref host,
778                port,
779                insecure,
780                ref tls_params,
781            } => {
782                let host: &str = host;
783                let config = create_rustls_config(insecure, tls_params.clone())?;
784                let conn = rustls::ClientConnection::new(
785                    Arc::new(config),
786                    rustls::pki_types::ServerName::try_from(host)?.to_owned(),
787                )?;
788                let reader = match timeout {
789                    None => {
790                        let tcp = connect_tcp((host, port))?;
791                        StreamOwned::new(conn, tcp)
792                    }
793                    Some(timeout) => {
794                        let mut tcp = None;
795                        let mut last_error = None;
796                        for addr in (host, port).to_socket_addrs()? {
797                            match connect_tcp_timeout(&addr, timeout) {
798                                Ok(l) => {
799                                    tcp = Some(l);
800                                    break;
801                                }
802                                Err(e) => {
803                                    last_error = Some(e);
804                                }
805                            };
806                        }
807                        match (tcp, last_error) {
808                            (Some(tcp), _) => StreamOwned::new(conn, tcp),
809                            (None, Some(e)) => {
810                                fail!(e);
811                            }
812                            (None, None) => {
813                                fail!((
814                                    ErrorKind::InvalidClientConfig,
815                                    "could not resolve to any addresses"
816                                ));
817                            }
818                        }
819                    }
820                };
821
822                ActualConnection::TcpRustls(Box::new(TcpRustlsConnection { reader, open: true }))
823            }
824            #[cfg(not(any(feature = "tls-native-tls", feature = "tls-rustls")))]
825            ConnectionAddr::TcpTls { .. } => {
826                fail!((
827                    ErrorKind::InvalidClientConfig,
828                    "Cannot connect to TCP with TLS without the tls feature"
829                ));
830            }
831            #[cfg(unix)]
832            ConnectionAddr::Unix(ref path) => ActualConnection::Unix(UnixConnection {
833                sock: UnixStream::connect(path)?,
834                open: true,
835            }),
836            #[cfg(not(unix))]
837            ConnectionAddr::Unix(ref _path) => {
838                fail!((
839                    ErrorKind::InvalidClientConfig,
840                    "Cannot connect to unix sockets \
841                     on this platform"
842                ));
843            }
844        })
845    }
846
847    pub fn send_bytes(&mut self, bytes: &[u8]) -> RedisResult<Value> {
848        match *self {
849            ActualConnection::Tcp(ref mut connection) => {
850                let res = connection.reader.write_all(bytes).map_err(RedisError::from);
851                match res {
852                    Err(e) => {
853                        if e.is_unrecoverable_error() {
854                            connection.open = false;
855                        }
856                        Err(e)
857                    }
858                    Ok(_) => Ok(Value::Okay),
859                }
860            }
861            #[cfg(all(feature = "tls-native-tls", not(feature = "tls-rustls")))]
862            ActualConnection::TcpNativeTls(ref mut connection) => {
863                let res = connection.reader.write_all(bytes).map_err(RedisError::from);
864                match res {
865                    Err(e) => {
866                        if e.is_unrecoverable_error() {
867                            connection.open = false;
868                        }
869                        Err(e)
870                    }
871                    Ok(_) => Ok(Value::Okay),
872                }
873            }
874            #[cfg(feature = "tls-rustls")]
875            ActualConnection::TcpRustls(ref mut connection) => {
876                let res = connection.reader.write_all(bytes).map_err(RedisError::from);
877                match res {
878                    Err(e) => {
879                        if e.is_unrecoverable_error() {
880                            connection.open = false;
881                        }
882                        Err(e)
883                    }
884                    Ok(_) => Ok(Value::Okay),
885                }
886            }
887            #[cfg(unix)]
888            ActualConnection::Unix(ref mut connection) => {
889                let result = connection.sock.write_all(bytes).map_err(RedisError::from);
890                match result {
891                    Err(e) => {
892                        if e.is_unrecoverable_error() {
893                            connection.open = false;
894                        }
895                        Err(e)
896                    }
897                    Ok(_) => Ok(Value::Okay),
898                }
899            }
900        }
901    }
902
903    pub fn set_write_timeout(&self, dur: Option<Duration>) -> RedisResult<()> {
904        match *self {
905            ActualConnection::Tcp(TcpConnection { ref reader, .. }) => {
906                reader.set_write_timeout(dur)?;
907            }
908            #[cfg(all(feature = "tls-native-tls", not(feature = "tls-rustls")))]
909            ActualConnection::TcpNativeTls(ref boxed_tls_connection) => {
910                let reader = &(boxed_tls_connection.reader);
911                reader.get_ref().set_write_timeout(dur)?;
912            }
913            #[cfg(feature = "tls-rustls")]
914            ActualConnection::TcpRustls(ref boxed_tls_connection) => {
915                let reader = &(boxed_tls_connection.reader);
916                reader.get_ref().set_write_timeout(dur)?;
917            }
918            #[cfg(unix)]
919            ActualConnection::Unix(UnixConnection { ref sock, .. }) => {
920                sock.set_write_timeout(dur)?;
921            }
922        }
923        Ok(())
924    }
925
926    pub fn set_read_timeout(&self, dur: Option<Duration>) -> RedisResult<()> {
927        match *self {
928            ActualConnection::Tcp(TcpConnection { ref reader, .. }) => {
929                reader.set_read_timeout(dur)?;
930            }
931            #[cfg(all(feature = "tls-native-tls", not(feature = "tls-rustls")))]
932            ActualConnection::TcpNativeTls(ref boxed_tls_connection) => {
933                let reader = &(boxed_tls_connection.reader);
934                reader.get_ref().set_read_timeout(dur)?;
935            }
936            #[cfg(feature = "tls-rustls")]
937            ActualConnection::TcpRustls(ref boxed_tls_connection) => {
938                let reader = &(boxed_tls_connection.reader);
939                reader.get_ref().set_read_timeout(dur)?;
940            }
941            #[cfg(unix)]
942            ActualConnection::Unix(UnixConnection { ref sock, .. }) => {
943                sock.set_read_timeout(dur)?;
944            }
945        }
946        Ok(())
947    }
948
949    pub fn is_open(&self) -> bool {
950        match *self {
951            ActualConnection::Tcp(TcpConnection { open, .. }) => open,
952            #[cfg(all(feature = "tls-native-tls", not(feature = "tls-rustls")))]
953            ActualConnection::TcpNativeTls(ref boxed_tls_connection) => boxed_tls_connection.open,
954            #[cfg(feature = "tls-rustls")]
955            ActualConnection::TcpRustls(ref boxed_tls_connection) => boxed_tls_connection.open,
956            #[cfg(unix)]
957            ActualConnection::Unix(UnixConnection { open, .. }) => open,
958        }
959    }
960}
961
962#[cfg(feature = "tls-rustls")]
963pub(crate) fn create_rustls_config(
964    insecure: bool,
965    tls_params: Option<TlsConnParams>,
966) -> RedisResult<rustls::ClientConfig> {
967    #[allow(unused_mut)]
968    let mut root_store = RootCertStore::empty();
969    #[cfg(feature = "tls-rustls-webpki-roots")]
970    root_store.extend(webpki_roots::TLS_SERVER_ROOTS.iter().cloned());
971    #[cfg(all(
972        feature = "tls-rustls",
973        not(feature = "tls-native-tls"),
974        not(feature = "tls-rustls-webpki-roots")
975    ))]
976    {
977        let mut certificate_result = load_native_certs();
978        if let Some(error) = certificate_result.errors.pop() {
979            return Err(error.into());
980        }
981        for cert in certificate_result.certs {
982            root_store.add(cert)?;
983        }
984    }
985
986    let config = rustls::ClientConfig::builder();
987    let config = if let Some(tls_params) = tls_params {
988        let root_cert_store = tls_params.root_cert_store.unwrap_or(root_store);
989        let config_builder = config.with_root_certificates(root_cert_store.clone());
990
991        let config_builder = if let Some(ClientTlsParams {
992            client_cert_chain: client_cert,
993            client_key,
994        }) = tls_params.client_tls_params
995        {
996            config_builder
997                .with_client_auth_cert(client_cert, client_key)
998                .map_err(|err| {
999                    RedisError::from((
1000                        ErrorKind::InvalidClientConfig,
1001                        "Unable to build client with TLS parameters provided.",
1002                        err.to_string(),
1003                    ))
1004                })?
1005        } else {
1006            config_builder.with_no_client_auth()
1007        };
1008
1009        // Implement `danger_accept_invalid_hostnames`.
1010        //
1011        // The strange cfg here is to handle a specific unusual combination of features: if
1012        // `tls-native-tls` and `tls-rustls` are enabled, but `tls-rustls-insecure` is not, and the
1013        // application tries to use the danger flag.
1014        #[cfg(any(feature = "tls-rustls-insecure", feature = "tls-native-tls"))]
1015        let config_builder = if !insecure && tls_params.danger_accept_invalid_hostnames {
1016            #[cfg(not(feature = "tls-rustls-insecure"))]
1017            {
1018                // This code should not enable an insecure mode if the `insecure` feature is not
1019                // set, but it shouldn't silently ignore the flag either. So return an error.
1020                fail!((
1021                    ErrorKind::InvalidClientConfig,
1022                    "Cannot create insecure client via danger_accept_invalid_hostnames without tls-rustls-insecure feature"
1023                ));
1024            }
1025
1026            #[cfg(feature = "tls-rustls-insecure")]
1027            {
1028                let mut config = config_builder;
1029                config.dangerous().set_certificate_verifier(Arc::new(
1030                    AcceptInvalidHostnamesCertVerifier {
1031                        inner: rustls::client::WebPkiServerVerifier::builder(Arc::new(
1032                            root_cert_store,
1033                        ))
1034                        .build()
1035                        .map_err(|err| rustls::Error::from(rustls::OtherError(Arc::new(err))))?,
1036                    },
1037                ));
1038                config
1039            }
1040        } else {
1041            config_builder
1042        };
1043
1044        config_builder
1045    } else {
1046        config
1047            .with_root_certificates(root_store)
1048            .with_no_client_auth()
1049    };
1050
1051    match (insecure, cfg!(feature = "tls-rustls-insecure")) {
1052        #[cfg(feature = "tls-rustls-insecure")]
1053        (true, true) => {
1054            let mut config = config;
1055            config.enable_sni = false;
1056            config
1057                .dangerous()
1058                .set_certificate_verifier(Arc::new(NoCertificateVerification {
1059                    supported: rustls::crypto::ring::default_provider()
1060                        .signature_verification_algorithms,
1061                }));
1062
1063            Ok(config)
1064        }
1065        (true, false) => {
1066            fail!((
1067                ErrorKind::InvalidClientConfig,
1068                "Cannot create insecure client without tls-rustls-insecure feature"
1069            ));
1070        }
1071        _ => Ok(config),
1072    }
1073}
1074
1075fn authenticate_cmd(
1076    connection_info: &RedisConnectionInfo,
1077    check_username: bool,
1078    password: &str,
1079) -> Cmd {
1080    let mut command = cmd("AUTH");
1081    if check_username {
1082        if let Some(username) = &connection_info.username {
1083            command.arg(username);
1084        }
1085    }
1086    command.arg(password);
1087    command
1088}
1089
1090pub fn connect(
1091    connection_info: &ConnectionInfo,
1092    timeout: Option<Duration>,
1093) -> RedisResult<Connection> {
1094    let start = Instant::now();
1095    let con: ActualConnection = ActualConnection::new(&connection_info.addr, timeout)?;
1096
1097    // we temporarily set the timeout, and will remove it after finishing setup.
1098    let remaining_timeout = timeout.and_then(|timeout| timeout.checked_sub(start.elapsed()));
1099    // TLS could run logic that doesn't contain a timeout, and should fail if it takes too long.
1100    if timeout.is_some() && remaining_timeout.is_none() {
1101        return Err(RedisError::from(std::io::Error::new(
1102            std::io::ErrorKind::TimedOut,
1103            "Connection timed out",
1104        )));
1105    }
1106    con.set_read_timeout(remaining_timeout)?;
1107    con.set_write_timeout(remaining_timeout)?;
1108
1109    let con = setup_connection(
1110        con,
1111        &connection_info.redis,
1112        #[cfg(feature = "cache-aio")]
1113        None,
1114    )?;
1115
1116    // remove the temporary timeout.
1117    con.set_read_timeout(None)?;
1118    con.set_write_timeout(None)?;
1119
1120    Ok(con)
1121}
1122
1123pub(crate) struct ConnectionSetupComponents {
1124    resp3_auth_cmd_idx: Option<usize>,
1125    resp2_auth_cmd_idx: Option<usize>,
1126    select_cmd_idx: Option<usize>,
1127    #[cfg(feature = "cache-aio")]
1128    cache_cmd_idx: Option<usize>,
1129}
1130
1131pub(crate) fn connection_setup_pipeline(
1132    connection_info: &RedisConnectionInfo,
1133    check_username: bool,
1134    #[cfg(feature = "cache-aio")] cache_config: Option<crate::caching::CacheConfig>,
1135) -> (crate::Pipeline, ConnectionSetupComponents) {
1136    let mut last_cmd_index = 0;
1137
1138    let mut get_next_command_index = |condition| {
1139        if condition {
1140            last_cmd_index += 1;
1141            Some(last_cmd_index - 1)
1142        } else {
1143            None
1144        }
1145    };
1146
1147    let authenticate_with_resp3_cmd_index =
1148        get_next_command_index(connection_info.protocol != ProtocolVersion::RESP2);
1149    let authenticate_with_resp2_cmd_index = get_next_command_index(
1150        authenticate_with_resp3_cmd_index.is_none() && connection_info.password.is_some(),
1151    );
1152    let select_db_cmd_index = get_next_command_index(connection_info.db != 0);
1153    #[cfg(feature = "cache-aio")]
1154    let cache_cmd_index = get_next_command_index(
1155        connection_info.protocol != ProtocolVersion::RESP2 && cache_config.is_some(),
1156    );
1157
1158    let mut pipeline = pipe();
1159
1160    if authenticate_with_resp3_cmd_index.is_some() {
1161        pipeline.add_command(resp3_hello(connection_info));
1162    } else if authenticate_with_resp2_cmd_index.is_some() {
1163        pipeline.add_command(authenticate_cmd(
1164            connection_info,
1165            check_username,
1166            connection_info.password.as_ref().unwrap(),
1167        ));
1168    }
1169
1170    if select_db_cmd_index.is_some() {
1171        pipeline.cmd("SELECT").arg(connection_info.db);
1172    }
1173
1174    // result is ignored, as per the command's instructions.
1175    // https://redis.io/commands/client-setinfo/
1176    #[cfg(not(feature = "disable-client-setinfo"))]
1177    pipeline
1178        .cmd("CLIENT")
1179        .arg("SETINFO")
1180        .arg("LIB-NAME")
1181        .arg("redis-rs")
1182        .ignore();
1183    #[cfg(not(feature = "disable-client-setinfo"))]
1184    pipeline
1185        .cmd("CLIENT")
1186        .arg("SETINFO")
1187        .arg("LIB-VER")
1188        .arg(env!("CARGO_PKG_VERSION"))
1189        .ignore();
1190
1191    #[cfg(feature = "cache-aio")]
1192    if cache_cmd_index.is_some() {
1193        let cache_config = cache_config.expect(
1194            "It's expected to have cache_config if cache_cmd_index is Some, please create an issue about this.",
1195        );
1196        pipeline.cmd("CLIENT").arg("TRACKING").arg("ON");
1197        match cache_config.mode {
1198            crate::caching::CacheMode::All => {}
1199            crate::caching::CacheMode::OptIn => {
1200                pipeline.arg("OPTIN");
1201            }
1202        }
1203    }
1204
1205    (
1206        pipeline,
1207        ConnectionSetupComponents {
1208            resp3_auth_cmd_idx: authenticate_with_resp3_cmd_index,
1209            resp2_auth_cmd_idx: authenticate_with_resp2_cmd_index,
1210            select_cmd_idx: select_db_cmd_index,
1211            #[cfg(feature = "cache-aio")]
1212            cache_cmd_idx: cache_cmd_index,
1213        },
1214    )
1215}
1216
1217fn check_resp3_auth(result: &Value) -> RedisResult<()> {
1218    if let Value::ServerError(err) = result {
1219        return Err(get_resp3_hello_command_error(err.clone().into()));
1220    }
1221    Ok(())
1222}
1223
1224#[derive(PartialEq)]
1225pub(crate) enum AuthResult {
1226    Succeeded,
1227    ShouldRetryWithoutUsername,
1228}
1229
1230fn check_resp2_auth(result: &Value) -> RedisResult<AuthResult> {
1231    let err = match result {
1232        Value::Okay => {
1233            return Ok(AuthResult::Succeeded);
1234        }
1235        Value::ServerError(err) => err,
1236        _ => {
1237            return Err((
1238                ErrorKind::ResponseError,
1239                "Redis server refused to authenticate, returns Ok() != Value::Okay",
1240            )
1241                .into());
1242        }
1243    };
1244
1245    let err_msg = err.details().ok_or((
1246        ErrorKind::AuthenticationFailed,
1247        "Password authentication failed",
1248    ))?;
1249    if !err_msg.contains("wrong number of arguments for 'auth' command") {
1250        return Err((
1251            ErrorKind::AuthenticationFailed,
1252            "Password authentication failed",
1253        )
1254            .into());
1255    }
1256    Ok(AuthResult::ShouldRetryWithoutUsername)
1257}
1258
1259fn check_db_select(value: &Value) -> RedisResult<()> {
1260    let Value::ServerError(err) = value else {
1261        return Ok(());
1262    };
1263
1264    match err.details() {
1265        Some(err_msg) => Err((
1266            ErrorKind::ResponseError,
1267            "Redis server refused to switch database",
1268            err_msg.to_string(),
1269        )
1270            .into()),
1271        None => Err((
1272            ErrorKind::ResponseError,
1273            "Redis server refused to switch database",
1274        )
1275            .into()),
1276    }
1277}
1278
1279#[cfg(feature = "cache-aio")]
1280fn check_caching(result: &Value) -> RedisResult<()> {
1281    match result {
1282        Value::Okay => Ok(()),
1283        _ => Err((
1284            ErrorKind::ResponseError,
1285            "Client-side caching returned unknown response",
1286        )
1287            .into()),
1288    }
1289}
1290
1291pub(crate) fn check_connection_setup(
1292    results: Vec<Value>,
1293    ConnectionSetupComponents {
1294        resp3_auth_cmd_idx,
1295        resp2_auth_cmd_idx,
1296        select_cmd_idx,
1297        #[cfg(feature = "cache-aio")]
1298        cache_cmd_idx,
1299    }: ConnectionSetupComponents,
1300) -> RedisResult<AuthResult> {
1301    // can't have both values set
1302    assert!(!(resp2_auth_cmd_idx.is_some() && resp3_auth_cmd_idx.is_some()));
1303
1304    if let Some(index) = resp3_auth_cmd_idx {
1305        let Some(value) = results.get(index) else {
1306            return Err((ErrorKind::ClientError, "Missing RESP3 auth response").into());
1307        };
1308        check_resp3_auth(value)?;
1309    } else if let Some(index) = resp2_auth_cmd_idx {
1310        let Some(value) = results.get(index) else {
1311            return Err((ErrorKind::ClientError, "Missing RESP2 auth response").into());
1312        };
1313        if check_resp2_auth(value)? == AuthResult::ShouldRetryWithoutUsername {
1314            return Ok(AuthResult::ShouldRetryWithoutUsername);
1315        }
1316    }
1317
1318    if let Some(index) = select_cmd_idx {
1319        let Some(value) = results.get(index) else {
1320            return Err((ErrorKind::ClientError, "Missing SELECT DB response").into());
1321        };
1322        check_db_select(value)?;
1323    }
1324
1325    #[cfg(feature = "cache-aio")]
1326    if let Some(index) = cache_cmd_idx {
1327        let Some(value) = results.get(index) else {
1328            return Err((ErrorKind::ClientError, "Missing Caching response").into());
1329        };
1330        check_caching(value)?;
1331    }
1332
1333    Ok(AuthResult::Succeeded)
1334}
1335
1336fn execute_connection_pipeline(
1337    rv: &mut Connection,
1338    (pipeline, instructions): (crate::Pipeline, ConnectionSetupComponents),
1339) -> RedisResult<AuthResult> {
1340    if pipeline.is_empty() {
1341        return Ok(AuthResult::Succeeded);
1342    }
1343    let results = rv.req_packed_commands(&pipeline.get_packed_pipeline(), 0, pipeline.len())?;
1344
1345    check_connection_setup(results, instructions)
1346}
1347
1348fn setup_connection(
1349    con: ActualConnection,
1350    connection_info: &RedisConnectionInfo,
1351    #[cfg(feature = "cache-aio")] cache_config: Option<crate::caching::CacheConfig>,
1352) -> RedisResult<Connection> {
1353    let mut rv = Connection {
1354        con,
1355        parser: Parser::new(),
1356        db: connection_info.db,
1357        pubsub: false,
1358        protocol: connection_info.protocol,
1359        push_sender: None,
1360        messages_to_skip: 0,
1361    };
1362
1363    if execute_connection_pipeline(
1364        &mut rv,
1365        connection_setup_pipeline(
1366            connection_info,
1367            true,
1368            #[cfg(feature = "cache-aio")]
1369            cache_config,
1370        ),
1371    )? == AuthResult::ShouldRetryWithoutUsername
1372    {
1373        execute_connection_pipeline(
1374            &mut rv,
1375            connection_setup_pipeline(
1376                connection_info,
1377                false,
1378                #[cfg(feature = "cache-aio")]
1379                cache_config,
1380            ),
1381        )?;
1382    }
1383
1384    Ok(rv)
1385}
1386
1387/// Implements the "stateless" part of the connection interface that is used by the
1388/// different objects in redis-rs.
1389///
1390/// Primarily it obviously applies to `Connection` object but also some other objects
1391///  implement the interface (for instance whole clients or certain redis results).
1392///
1393/// Generally clients and connections (as well as redis results of those) implement
1394/// this trait.  Actual connections provide more functionality which can be used
1395/// to implement things like `PubSub` but they also can modify the intrinsic
1396/// state of the TCP connection.  This is not possible with `ConnectionLike`
1397/// implementors because that functionality is not exposed.
1398pub trait ConnectionLike {
1399    /// Sends an already encoded (packed) command into the TCP socket and
1400    /// reads the single response from it.
1401    fn req_packed_command(&mut self, cmd: &[u8]) -> RedisResult<Value>;
1402
1403    /// Sends multiple already encoded (packed) command into the TCP socket
1404    /// and reads `count` responses from it.  This is used to implement
1405    /// pipelining.
1406    /// Important - this function is meant for internal usage, since it's
1407    /// easy to pass incorrect `offset` & `count` parameters, which might
1408    /// cause the connection to enter an erroneous state. Users shouldn't
1409    /// call it, instead using the Pipeline::query function.
1410    #[doc(hidden)]
1411    fn req_packed_commands(
1412        &mut self,
1413        cmd: &[u8],
1414        offset: usize,
1415        count: usize,
1416    ) -> RedisResult<Vec<Value>>;
1417
1418    /// Sends a [Cmd] into the TCP socket and reads a single response from it.
1419    fn req_command(&mut self, cmd: &Cmd) -> RedisResult<Value> {
1420        let pcmd = cmd.get_packed_command();
1421        self.req_packed_command(&pcmd)
1422    }
1423
1424    /// Returns the database this connection is bound to.  Note that this
1425    /// information might be unreliable because it's initially cached and
1426    /// also might be incorrect if the connection like object is not
1427    /// actually connected.
1428    fn get_db(&self) -> i64;
1429
1430    /// Does this connection support pipelining?
1431    #[doc(hidden)]
1432    fn supports_pipelining(&self) -> bool {
1433        true
1434    }
1435
1436    /// Check that all connections it has are available (`PING` internally).
1437    fn check_connection(&mut self) -> bool;
1438
1439    /// Returns the connection status.
1440    ///
1441    /// The connection is open until any `read` call received an
1442    /// invalid response from the server (most likely a closed or dropped
1443    /// connection, otherwise a Redis protocol error). When using unix
1444    /// sockets the connection is open until writing a command failed with a
1445    /// `BrokenPipe` error.
1446    fn is_open(&self) -> bool;
1447}
1448
1449/// A connection is an object that represents a single redis connection.  It
1450/// provides basic support for sending encoded commands into a redis connection
1451/// and to read a response from it.  It's bound to a single database and can
1452/// only be created from the client.
1453///
1454/// You generally do not much with this object other than passing it to
1455/// `Cmd` objects.
1456impl Connection {
1457    /// Sends an already encoded (packed) command into the TCP socket and
1458    /// does not read a response.  This is useful for commands like
1459    /// `MONITOR` which yield multiple items.  This needs to be used with
1460    /// care because it changes the state of the connection.
1461    pub fn send_packed_command(&mut self, cmd: &[u8]) -> RedisResult<()> {
1462        self.send_bytes(cmd)?;
1463        Ok(())
1464    }
1465
1466    /// Fetches a single response from the connection.  This is useful
1467    /// if used in combination with `send_packed_command`.
1468    pub fn recv_response(&mut self) -> RedisResult<Value> {
1469        self.read(true)
1470    }
1471
1472    /// Sets the write timeout for the connection.
1473    ///
1474    /// If the provided value is `None`, then `send_packed_command` call will
1475    /// block indefinitely. It is an error to pass the zero `Duration` to this
1476    /// method.
1477    pub fn set_write_timeout(&self, dur: Option<Duration>) -> RedisResult<()> {
1478        self.con.set_write_timeout(dur)
1479    }
1480
1481    /// Sets the read timeout for the connection.
1482    ///
1483    /// If the provided value is `None`, then `recv_response` call will
1484    /// block indefinitely. It is an error to pass the zero `Duration` to this
1485    /// method.
1486    pub fn set_read_timeout(&self, dur: Option<Duration>) -> RedisResult<()> {
1487        self.con.set_read_timeout(dur)
1488    }
1489
1490    /// Creates a [`PubSub`] instance for this connection.
1491    pub fn as_pubsub(&mut self) -> PubSub<'_> {
1492        // NOTE: The pubsub flag is intentionally not raised at this time since
1493        // running commands within the pubsub state should not try and exit from
1494        // the pubsub state.
1495        PubSub::new(self)
1496    }
1497
1498    fn exit_pubsub(&mut self) -> RedisResult<()> {
1499        let res = self.clear_active_subscriptions();
1500        if res.is_ok() {
1501            self.pubsub = false;
1502        } else {
1503            // Raise the pubsub flag to indicate the connection is "stuck" in that state.
1504            self.pubsub = true;
1505        }
1506
1507        res
1508    }
1509
1510    /// Get the inner connection out of a PubSub
1511    ///
1512    /// Any active subscriptions are unsubscribed. In the event of an error, the connection is
1513    /// dropped.
1514    fn clear_active_subscriptions(&mut self) -> RedisResult<()> {
1515        // Responses to unsubscribe commands return in a 3-tuple with values
1516        // ("unsubscribe" or "punsubscribe", name of subscription removed, count of remaining subs).
1517        // The "count of remaining subs" includes both pattern subscriptions and non pattern
1518        // subscriptions. Thus, to accurately drain all unsubscribe messages received from the
1519        // server, both commands need to be executed at once.
1520        {
1521            // Prepare both unsubscribe commands
1522            let unsubscribe = cmd("UNSUBSCRIBE").get_packed_command();
1523            let punsubscribe = cmd("PUNSUBSCRIBE").get_packed_command();
1524
1525            // Execute commands
1526            self.send_bytes(&unsubscribe)?;
1527            self.send_bytes(&punsubscribe)?;
1528        }
1529
1530        // Receive responses
1531        //
1532        // There will be at minimum two responses - 1 for each of punsubscribe and unsubscribe
1533        // commands. There may be more responses if there are active subscriptions. In this case,
1534        // messages are received until the _subscription count_ in the responses reach zero.
1535        let mut received_unsub = false;
1536        let mut received_punsub = false;
1537
1538        loop {
1539            let resp = self.recv_response()?;
1540
1541            match resp {
1542                Value::Push { kind, data } => {
1543                    if data.len() >= 2 {
1544                        if let Value::Int(num) = data[1] {
1545                            if resp3_is_pub_sub_state_cleared(
1546                                &mut received_unsub,
1547                                &mut received_punsub,
1548                                &kind,
1549                                num as isize,
1550                            ) {
1551                                break;
1552                            }
1553                        }
1554                    }
1555                }
1556                Value::ServerError(err) => {
1557                    // a new error behavior, introduced in valkey 8.
1558                    // https://github.com/valkey-io/valkey/pull/759
1559                    if err.kind() == Some(ServerErrorKind::NoSub) {
1560                        if no_sub_err_is_pub_sub_state_cleared(
1561                            &mut received_unsub,
1562                            &mut received_punsub,
1563                            &err,
1564                        ) {
1565                            break;
1566                        } else {
1567                            continue;
1568                        }
1569                    }
1570
1571                    return Err(err.into());
1572                }
1573                Value::Array(vec) => {
1574                    let res: (Vec<u8>, (), isize) = from_owned_redis_value(Value::Array(vec))?;
1575                    if resp2_is_pub_sub_state_cleared(
1576                        &mut received_unsub,
1577                        &mut received_punsub,
1578                        &res.0,
1579                        res.2,
1580                    ) {
1581                        break;
1582                    }
1583                }
1584                _ => {
1585                    return Err((
1586                        ErrorKind::ClientError,
1587                        "Unexpected unsubscribe response",
1588                        format!("{resp:?}"),
1589                    )
1590                        .into())
1591                }
1592            }
1593        }
1594
1595        // Finally, the connection is back in its normal state since all subscriptions were
1596        // cancelled *and* all unsubscribe messages were received.
1597        Ok(())
1598    }
1599
1600    fn send_push(&self, push: PushInfo) {
1601        if let Some(sender) = &self.push_sender {
1602            let _ = sender.send(push);
1603        }
1604    }
1605
1606    fn try_send(&self, value: &RedisResult<Value>) {
1607        if let Ok(Value::Push { kind, data }) = value {
1608            self.send_push(PushInfo {
1609                kind: kind.clone(),
1610                data: data.clone(),
1611            });
1612        }
1613    }
1614
1615    fn send_disconnect(&self) {
1616        self.send_push(PushInfo::disconnect())
1617    }
1618
1619    fn close_connection(&mut self) {
1620        // Notify the PushManager that the connection was lost
1621        self.send_disconnect();
1622        match self.con {
1623            ActualConnection::Tcp(ref mut connection) => {
1624                let _ = connection.reader.shutdown(net::Shutdown::Both);
1625                connection.open = false;
1626            }
1627            #[cfg(all(feature = "tls-native-tls", not(feature = "tls-rustls")))]
1628            ActualConnection::TcpNativeTls(ref mut connection) => {
1629                let _ = connection.reader.shutdown();
1630                connection.open = false;
1631            }
1632            #[cfg(feature = "tls-rustls")]
1633            ActualConnection::TcpRustls(ref mut connection) => {
1634                let _ = connection.reader.get_mut().shutdown(net::Shutdown::Both);
1635                connection.open = false;
1636            }
1637            #[cfg(unix)]
1638            ActualConnection::Unix(ref mut connection) => {
1639                let _ = connection.sock.shutdown(net::Shutdown::Both);
1640                connection.open = false;
1641            }
1642        }
1643    }
1644
1645    /// Fetches a single message from the connection. If the message is a response,
1646    /// increment `messages_to_skip` if it wasn't received before a timeout.
1647    fn read(&mut self, is_response: bool) -> RedisResult<Value> {
1648        loop {
1649            let result = match self.con {
1650                ActualConnection::Tcp(TcpConnection { ref mut reader, .. }) => {
1651                    self.parser.parse_value(reader)
1652                }
1653                #[cfg(all(feature = "tls-native-tls", not(feature = "tls-rustls")))]
1654                ActualConnection::TcpNativeTls(ref mut boxed_tls_connection) => {
1655                    let reader = &mut boxed_tls_connection.reader;
1656                    self.parser.parse_value(reader)
1657                }
1658                #[cfg(feature = "tls-rustls")]
1659                ActualConnection::TcpRustls(ref mut boxed_tls_connection) => {
1660                    let reader = &mut boxed_tls_connection.reader;
1661                    self.parser.parse_value(reader)
1662                }
1663                #[cfg(unix)]
1664                ActualConnection::Unix(UnixConnection { ref mut sock, .. }) => {
1665                    self.parser.parse_value(sock)
1666                }
1667            };
1668            self.try_send(&result);
1669
1670            let Err(err) = &result else {
1671                if self.messages_to_skip > 0 {
1672                    self.messages_to_skip -= 1;
1673                    continue;
1674                }
1675                return result;
1676            };
1677            let Some(io_error) = err.as_io_error() else {
1678                if self.messages_to_skip > 0 {
1679                    self.messages_to_skip -= 1;
1680                    continue;
1681                }
1682                return result;
1683            };
1684            // shutdown connection on protocol error
1685            if io_error.kind() == io::ErrorKind::UnexpectedEof {
1686                self.close_connection();
1687            } else if is_response {
1688                self.messages_to_skip += 1;
1689            }
1690
1691            return result;
1692        }
1693    }
1694
1695    /// Sets sender channel for push values.
1696    pub fn set_push_sender(&mut self, sender: SyncPushSender) {
1697        self.push_sender = Some(sender);
1698    }
1699
1700    fn send_bytes(&mut self, bytes: &[u8]) -> RedisResult<Value> {
1701        let result = self.con.send_bytes(bytes);
1702        if self.protocol != ProtocolVersion::RESP2 {
1703            if let Err(e) = &result {
1704                if e.is_connection_dropped() {
1705                    self.send_disconnect();
1706                }
1707            }
1708        }
1709        result
1710    }
1711}
1712
1713impl ConnectionLike for Connection {
1714    /// Sends a [Cmd] into the TCP socket and reads a single response from it.
1715    fn req_command(&mut self, cmd: &Cmd) -> RedisResult<Value> {
1716        let pcmd = cmd.get_packed_command();
1717        if self.pubsub {
1718            self.exit_pubsub()?;
1719        }
1720
1721        self.send_bytes(&pcmd)?;
1722        if cmd.is_no_response() {
1723            return Ok(Value::Nil);
1724        }
1725        loop {
1726            match self.read(true)? {
1727                Value::Push {
1728                    kind: _kind,
1729                    data: _data,
1730                } => continue,
1731                val => return Ok(val),
1732            }
1733        }
1734    }
1735    fn req_packed_command(&mut self, cmd: &[u8]) -> RedisResult<Value> {
1736        if self.pubsub {
1737            self.exit_pubsub()?;
1738        }
1739
1740        self.send_bytes(cmd)?;
1741        loop {
1742            match self.read(true)? {
1743                Value::Push {
1744                    kind: _kind,
1745                    data: _data,
1746                } => continue,
1747                val => return Ok(val),
1748            }
1749        }
1750    }
1751
1752    fn req_packed_commands(
1753        &mut self,
1754        cmd: &[u8],
1755        offset: usize,
1756        count: usize,
1757    ) -> RedisResult<Vec<Value>> {
1758        if self.pubsub {
1759            self.exit_pubsub()?;
1760        }
1761        self.send_bytes(cmd)?;
1762        let mut rv = vec![];
1763        let mut first_err = None;
1764        let mut count = count;
1765        let mut idx = 0;
1766        while idx < (offset + count) {
1767            // When processing a transaction, some responses may be errors.
1768            // We need to keep processing the rest of the responses in that case,
1769            // so bailing early with `?` would not be correct.
1770            // See: https://github.com/redis-rs/redis-rs/issues/436
1771            let response = self.read(true);
1772            match response {
1773                Ok(Value::ServerError(err)) => {
1774                    if idx < offset {
1775                        if first_err.is_none() {
1776                            first_err = Some(err.into());
1777                        }
1778                    } else {
1779                        rv.push(Value::ServerError(err));
1780                    }
1781                }
1782                Ok(item) => {
1783                    // RESP3 can insert push data between command replies
1784                    if let Value::Push {
1785                        kind: _kind,
1786                        data: _data,
1787                    } = item
1788                    {
1789                        // if that is the case we have to extend the loop and handle push data
1790                        count += 1;
1791                    } else if idx >= offset {
1792                        rv.push(item);
1793                    }
1794                }
1795                Err(err) => {
1796                    if first_err.is_none() {
1797                        first_err = Some(err);
1798                    }
1799                }
1800            }
1801            idx += 1;
1802        }
1803
1804        first_err.map_or(Ok(rv), Err)
1805    }
1806
1807    fn get_db(&self) -> i64 {
1808        self.db
1809    }
1810
1811    fn check_connection(&mut self) -> bool {
1812        cmd("PING").query::<String>(self).is_ok()
1813    }
1814
1815    fn is_open(&self) -> bool {
1816        self.con.is_open()
1817    }
1818}
1819
1820impl<C, T> ConnectionLike for T
1821where
1822    C: ConnectionLike,
1823    T: DerefMut<Target = C>,
1824{
1825    fn req_packed_command(&mut self, cmd: &[u8]) -> RedisResult<Value> {
1826        self.deref_mut().req_packed_command(cmd)
1827    }
1828
1829    fn req_packed_commands(
1830        &mut self,
1831        cmd: &[u8],
1832        offset: usize,
1833        count: usize,
1834    ) -> RedisResult<Vec<Value>> {
1835        self.deref_mut().req_packed_commands(cmd, offset, count)
1836    }
1837
1838    fn req_command(&mut self, cmd: &Cmd) -> RedisResult<Value> {
1839        self.deref_mut().req_command(cmd)
1840    }
1841
1842    fn get_db(&self) -> i64 {
1843        self.deref().get_db()
1844    }
1845
1846    fn supports_pipelining(&self) -> bool {
1847        self.deref().supports_pipelining()
1848    }
1849
1850    fn check_connection(&mut self) -> bool {
1851        self.deref_mut().check_connection()
1852    }
1853
1854    fn is_open(&self) -> bool {
1855        self.deref().is_open()
1856    }
1857}
1858
1859/// The pubsub object provides convenient access to the redis pubsub
1860/// system.  Once created you can subscribe and unsubscribe from channels
1861/// and listen in on messages.
1862///
1863/// Example:
1864///
1865/// ```rust,no_run
1866/// # fn do_something() -> redis::RedisResult<()> {
1867/// let client = redis::Client::open("redis://127.0.0.1/")?;
1868/// let mut con = client.get_connection()?;
1869/// let mut pubsub = con.as_pubsub();
1870/// pubsub.subscribe("channel_1")?;
1871/// pubsub.subscribe("channel_2")?;
1872///
1873/// loop {
1874///     let msg = pubsub.get_message()?;
1875///     let payload : String = msg.get_payload()?;
1876///     println!("channel '{}': {}", msg.get_channel_name(), payload);
1877/// }
1878/// # }
1879/// ```
1880impl<'a> PubSub<'a> {
1881    fn new(con: &'a mut Connection) -> Self {
1882        Self {
1883            con,
1884            waiting_messages: VecDeque::new(),
1885        }
1886    }
1887
1888    fn cache_messages_until_received_response(
1889        &mut self,
1890        cmd: &mut Cmd,
1891        is_sub_unsub: bool,
1892    ) -> RedisResult<Value> {
1893        let ignore_response = self.con.protocol != ProtocolVersion::RESP2 && is_sub_unsub;
1894        cmd.set_no_response(ignore_response);
1895
1896        self.con.send_packed_command(&cmd.get_packed_command())?;
1897
1898        loop {
1899            let response = self.con.recv_response()?;
1900            if let Some(msg) = Msg::from_value(&response) {
1901                self.waiting_messages.push_back(msg);
1902            } else {
1903                return Ok(response);
1904            }
1905        }
1906    }
1907
1908    /// Subscribes to a new channel(s).    
1909    pub fn subscribe<T: ToRedisArgs>(&mut self, channel: T) -> RedisResult<()> {
1910        self.cache_messages_until_received_response(cmd("SUBSCRIBE").arg(channel), true)?;
1911        Ok(())
1912    }
1913
1914    /// Subscribes to new channel(s) with pattern(s).
1915    pub fn psubscribe<T: ToRedisArgs>(&mut self, pchannel: T) -> RedisResult<()> {
1916        self.cache_messages_until_received_response(cmd("PSUBSCRIBE").arg(pchannel), true)?;
1917        Ok(())
1918    }
1919
1920    /// Unsubscribes from a channel(s).
1921    pub fn unsubscribe<T: ToRedisArgs>(&mut self, channel: T) -> RedisResult<()> {
1922        self.cache_messages_until_received_response(cmd("UNSUBSCRIBE").arg(channel), true)?;
1923        Ok(())
1924    }
1925
1926    /// Unsubscribes from channel pattern(s).
1927    pub fn punsubscribe<T: ToRedisArgs>(&mut self, pchannel: T) -> RedisResult<()> {
1928        self.cache_messages_until_received_response(cmd("PUNSUBSCRIBE").arg(pchannel), true)?;
1929        Ok(())
1930    }
1931
1932    /// Sends a ping with a message to the server
1933    pub fn ping_message<T: FromRedisValue>(&mut self, message: impl ToRedisArgs) -> RedisResult<T> {
1934        from_owned_redis_value(
1935            self.cache_messages_until_received_response(cmd("PING").arg(message), false)?,
1936        )
1937    }
1938    /// Sends a ping to the server
1939    pub fn ping<T: FromRedisValue>(&mut self) -> RedisResult<T> {
1940        from_owned_redis_value(
1941            self.cache_messages_until_received_response(&mut cmd("PING"), false)?,
1942        )
1943    }
1944
1945    /// Fetches the next message from the pubsub connection.  Blocks until
1946    /// a message becomes available.  This currently does not provide a
1947    /// wait not to block :(
1948    ///
1949    /// The message itself is still generic and can be converted into an
1950    /// appropriate type through the helper methods on it.
1951    pub fn get_message(&mut self) -> RedisResult<Msg> {
1952        if let Some(msg) = self.waiting_messages.pop_front() {
1953            return Ok(msg);
1954        }
1955        loop {
1956            if let Some(msg) = Msg::from_owned_value(self.con.read(false)?) {
1957                return Ok(msg);
1958            } else {
1959                continue;
1960            }
1961        }
1962    }
1963
1964    /// Sets the read timeout for the connection.
1965    ///
1966    /// If the provided value is `None`, then `get_message` call will
1967    /// block indefinitely. It is an error to pass the zero `Duration` to this
1968    /// method.
1969    pub fn set_read_timeout(&self, dur: Option<Duration>) -> RedisResult<()> {
1970        self.con.set_read_timeout(dur)
1971    }
1972}
1973
1974impl Drop for PubSub<'_> {
1975    fn drop(&mut self) {
1976        let _ = self.con.exit_pubsub();
1977    }
1978}
1979
1980/// This holds the data that comes from listening to a pubsub
1981/// connection.  It only contains actual message data.
1982impl Msg {
1983    /// Tries to convert provided [`Value`] into [`Msg`].
1984    pub fn from_value(value: &Value) -> Option<Self> {
1985        Self::from_owned_value(value.clone())
1986    }
1987
1988    /// Tries to convert provided [`Value`] into [`Msg`].
1989    pub fn from_owned_value(value: Value) -> Option<Self> {
1990        let mut pattern = None;
1991        let payload;
1992        let channel;
1993
1994        if let Value::Push { kind, data } = value {
1995            return Self::from_push_info(PushInfo { kind, data });
1996        } else {
1997            let raw_msg: Vec<Value> = from_owned_redis_value(value).ok()?;
1998            let mut iter = raw_msg.into_iter();
1999            let msg_type: String = from_owned_redis_value(iter.next()?).ok()?;
2000            if msg_type == "message" {
2001                channel = iter.next()?;
2002                payload = iter.next()?;
2003            } else if msg_type == "pmessage" {
2004                pattern = Some(iter.next()?);
2005                channel = iter.next()?;
2006                payload = iter.next()?;
2007            } else {
2008                return None;
2009            }
2010        };
2011        Some(Msg {
2012            payload,
2013            channel,
2014            pattern,
2015        })
2016    }
2017
2018    /// Tries to convert provided [`PushInfo`] into [`Msg`].
2019    pub fn from_push_info(push_info: PushInfo) -> Option<Self> {
2020        let mut pattern = None;
2021        let payload;
2022        let channel;
2023
2024        let mut iter = push_info.data.into_iter();
2025        if push_info.kind == PushKind::Message || push_info.kind == PushKind::SMessage {
2026            channel = iter.next()?;
2027            payload = iter.next()?;
2028        } else if push_info.kind == PushKind::PMessage {
2029            pattern = Some(iter.next()?);
2030            channel = iter.next()?;
2031            payload = iter.next()?;
2032        } else {
2033            return None;
2034        }
2035
2036        Some(Msg {
2037            payload,
2038            channel,
2039            pattern,
2040        })
2041    }
2042
2043    /// Returns the channel this message came on.
2044    pub fn get_channel<T: FromRedisValue>(&self) -> RedisResult<T> {
2045        from_redis_value(&self.channel)
2046    }
2047
2048    /// Convenience method to get a string version of the channel.  Unless
2049    /// your channel contains non utf-8 bytes you can always use this
2050    /// method.  If the channel is not a valid string (which really should
2051    /// not happen) then the return value is `"?"`.
2052    pub fn get_channel_name(&self) -> &str {
2053        match self.channel {
2054            Value::BulkString(ref bytes) => from_utf8(bytes).unwrap_or("?"),
2055            _ => "?",
2056        }
2057    }
2058
2059    /// Returns the message's payload in a specific format.
2060    pub fn get_payload<T: FromRedisValue>(&self) -> RedisResult<T> {
2061        from_redis_value(&self.payload)
2062    }
2063
2064    /// Returns the bytes that are the message's payload.  This can be used
2065    /// as an alternative to the `get_payload` function if you are interested
2066    /// in the raw bytes in it.
2067    pub fn get_payload_bytes(&self) -> &[u8] {
2068        match self.payload {
2069            Value::BulkString(ref bytes) => bytes,
2070            _ => b"",
2071        }
2072    }
2073
2074    /// Returns true if the message was constructed from a pattern
2075    /// subscription.
2076    #[allow(clippy::wrong_self_convention)]
2077    pub fn from_pattern(&self) -> bool {
2078        self.pattern.is_some()
2079    }
2080
2081    /// If the message was constructed from a message pattern this can be
2082    /// used to find out which one.  It's recommended to match against
2083    /// an `Option<String>` so that you do not need to use `from_pattern`
2084    /// to figure out if a pattern was set.
2085    pub fn get_pattern<T: FromRedisValue>(&self) -> RedisResult<T> {
2086        match self.pattern {
2087            None => from_redis_value(&Value::Nil),
2088            Some(ref x) => from_redis_value(x),
2089        }
2090    }
2091}
2092
2093/// This function simplifies transaction management slightly.  What it
2094/// does is automatically watching keys and then going into a transaction
2095/// loop util it succeeds.  Once it goes through the results are
2096/// returned.
2097///
2098/// To use the transaction two pieces of information are needed: a list
2099/// of all the keys that need to be watched for modifications and a
2100/// closure with the code that should be execute in the context of the
2101/// transaction.  The closure is invoked with a fresh pipeline in atomic
2102/// mode.  To use the transaction the function needs to return the result
2103/// from querying the pipeline with the connection.
2104///
2105/// The end result of the transaction is then available as the return
2106/// value from the function call.
2107///
2108/// Example:
2109///
2110/// ```rust,no_run
2111/// use redis::Commands;
2112/// # fn do_something() -> redis::RedisResult<()> {
2113/// # let client = redis::Client::open("redis://127.0.0.1/").unwrap();
2114/// # let mut con = client.get_connection().unwrap();
2115/// let key = "the_key";
2116/// let (new_val,) : (isize,) = redis::transaction(&mut con, &[key], |con, pipe| {
2117///     let old_val : isize = con.get(key)?;
2118///     pipe
2119///         .set(key, old_val + 1).ignore()
2120///         .get(key).query(con)
2121/// })?;
2122/// println!("The incremented number is: {}", new_val);
2123/// # Ok(()) }
2124/// ```
2125pub fn transaction<
2126    C: ConnectionLike,
2127    K: ToRedisArgs,
2128    T,
2129    F: FnMut(&mut C, &mut Pipeline) -> RedisResult<Option<T>>,
2130>(
2131    con: &mut C,
2132    keys: &[K],
2133    func: F,
2134) -> RedisResult<T> {
2135    let mut func = func;
2136    loop {
2137        cmd("WATCH").arg(keys).exec(con)?;
2138        let mut p = pipe();
2139        let response: Option<T> = func(con, p.atomic())?;
2140        match response {
2141            None => {
2142                continue;
2143            }
2144            Some(response) => {
2145                // make sure no watch is left in the connection, even if
2146                // someone forgot to use the pipeline.
2147                cmd("UNWATCH").exec(con)?;
2148                return Ok(response);
2149            }
2150        }
2151    }
2152}
2153//TODO: for both clearing logic support sharded channels.
2154
2155/// Common logic for clearing subscriptions in RESP2 async/sync
2156pub fn resp2_is_pub_sub_state_cleared(
2157    received_unsub: &mut bool,
2158    received_punsub: &mut bool,
2159    kind: &[u8],
2160    num: isize,
2161) -> bool {
2162    match kind.first() {
2163        Some(&b'u') => *received_unsub = true,
2164        Some(&b'p') => *received_punsub = true,
2165        _ => (),
2166    };
2167    *received_unsub && *received_punsub && num == 0
2168}
2169
2170/// Common logic for clearing subscriptions in RESP3 async/sync
2171pub fn resp3_is_pub_sub_state_cleared(
2172    received_unsub: &mut bool,
2173    received_punsub: &mut bool,
2174    kind: &PushKind,
2175    num: isize,
2176) -> bool {
2177    match kind {
2178        PushKind::Unsubscribe => *received_unsub = true,
2179        PushKind::PUnsubscribe => *received_punsub = true,
2180        _ => (),
2181    };
2182    *received_unsub && *received_punsub && num == 0
2183}
2184
2185pub fn no_sub_err_is_pub_sub_state_cleared(
2186    received_unsub: &mut bool,
2187    received_punsub: &mut bool,
2188    err: &ServerError,
2189) -> bool {
2190    let details = err.details();
2191    *received_unsub = *received_unsub
2192        || details
2193            .map(|details| details.starts_with("'unsub"))
2194            .unwrap_or_default();
2195    *received_punsub = *received_punsub
2196        || details
2197            .map(|details| details.starts_with("'punsub"))
2198            .unwrap_or_default();
2199    *received_unsub && *received_punsub
2200}
2201
2202/// Common logic for checking real cause of hello3 command error
2203pub fn get_resp3_hello_command_error(err: RedisError) -> RedisError {
2204    if let Some(detail) = err.detail() {
2205        if detail.starts_with("unknown command `HELLO`") {
2206            return (
2207                ErrorKind::RESP3NotSupported,
2208                "Redis Server doesn't support HELLO command therefore resp3 cannot be used",
2209            )
2210                .into();
2211        }
2212    }
2213    err
2214}
2215
2216#[cfg(test)]
2217mod tests {
2218    use super::*;
2219
2220    #[test]
2221    fn test_parse_redis_url() {
2222        let cases = vec![
2223            ("redis://127.0.0.1", true),
2224            ("redis://[::1]", true),
2225            ("rediss://127.0.0.1", true),
2226            ("rediss://[::1]", true),
2227            ("valkey://127.0.0.1", true),
2228            ("valkey://[::1]", true),
2229            ("valkeys://127.0.0.1", true),
2230            ("valkeys://[::1]", true),
2231            ("redis+unix:///run/redis.sock", true),
2232            ("valkey+unix:///run/valkey.sock", true),
2233            ("unix:///run/redis.sock", true),
2234            ("http://127.0.0.1", false),
2235            ("tcp://127.0.0.1", false),
2236        ];
2237        for (url, expected) in cases.into_iter() {
2238            let res = parse_redis_url(url);
2239            assert_eq!(
2240                res.is_some(),
2241                expected,
2242                "Parsed result of `{url}` is not expected",
2243            );
2244        }
2245    }
2246
2247    #[test]
2248    fn test_url_to_tcp_connection_info() {
2249        let cases = vec![
2250            (
2251                url::Url::parse("redis://127.0.0.1").unwrap(),
2252                ConnectionInfo {
2253                    addr: ConnectionAddr::Tcp("127.0.0.1".to_string(), 6379),
2254                    redis: Default::default(),
2255                },
2256            ),
2257            (
2258                url::Url::parse("redis://[::1]").unwrap(),
2259                ConnectionInfo {
2260                    addr: ConnectionAddr::Tcp("::1".to_string(), 6379),
2261                    redis: Default::default(),
2262                },
2263            ),
2264            (
2265                url::Url::parse("redis://%25johndoe%25:%23%40%3C%3E%24@example.com/2").unwrap(),
2266                ConnectionInfo {
2267                    addr: ConnectionAddr::Tcp("example.com".to_string(), 6379),
2268                    redis: RedisConnectionInfo {
2269                        db: 2,
2270                        username: Some("%johndoe%".to_string()),
2271                        password: Some("#@<>$".to_string()),
2272                        ..Default::default()
2273                    },
2274                },
2275            ),
2276            (
2277                url::Url::parse("redis://127.0.0.1/?protocol=2").unwrap(),
2278                ConnectionInfo {
2279                    addr: ConnectionAddr::Tcp("127.0.0.1".to_string(), 6379),
2280                    redis: Default::default(),
2281                },
2282            ),
2283            (
2284                url::Url::parse("redis://127.0.0.1/?protocol=resp3").unwrap(),
2285                ConnectionInfo {
2286                    addr: ConnectionAddr::Tcp("127.0.0.1".to_string(), 6379),
2287                    redis: RedisConnectionInfo {
2288                        protocol: ProtocolVersion::RESP3,
2289                        ..Default::default()
2290                    },
2291                },
2292            ),
2293        ];
2294        for (url, expected) in cases.into_iter() {
2295            let res = url_to_tcp_connection_info(url.clone()).unwrap();
2296            assert_eq!(res.addr, expected.addr, "addr of {url} is not expected");
2297            assert_eq!(
2298                res.redis.db, expected.redis.db,
2299                "db of {url} is not expected",
2300            );
2301            assert_eq!(
2302                res.redis.username, expected.redis.username,
2303                "username of {url} is not expected",
2304            );
2305            assert_eq!(
2306                res.redis.password, expected.redis.password,
2307                "password of {url} is not expected",
2308            );
2309        }
2310    }
2311
2312    #[test]
2313    fn test_url_to_tcp_connection_info_failed() {
2314        let cases = vec![
2315            (
2316                url::Url::parse("redis://").unwrap(),
2317                "Missing hostname",
2318                None,
2319            ),
2320            (
2321                url::Url::parse("redis://127.0.0.1/db").unwrap(),
2322                "Invalid database number",
2323                None,
2324            ),
2325            (
2326                url::Url::parse("redis://C3%B0@127.0.0.1").unwrap(),
2327                "Username is not valid UTF-8 string",
2328                None,
2329            ),
2330            (
2331                url::Url::parse("redis://:C3%B0@127.0.0.1").unwrap(),
2332                "Password is not valid UTF-8 string",
2333                None,
2334            ),
2335            (
2336                url::Url::parse("redis://127.0.0.1/?protocol=4").unwrap(),
2337                "Invalid protocol version",
2338                Some("4"),
2339            ),
2340        ];
2341        for (url, expected, detail) in cases.into_iter() {
2342            let res = url_to_tcp_connection_info(url).unwrap_err();
2343            assert_eq!(
2344                res.kind(),
2345                crate::ErrorKind::InvalidClientConfig,
2346                "{}",
2347                &res,
2348            );
2349            #[allow(deprecated)]
2350            let desc = std::error::Error::description(&res);
2351            assert_eq!(desc, expected, "{}", &res);
2352            assert_eq!(res.detail(), detail, "{}", &res);
2353        }
2354    }
2355
2356    #[test]
2357    #[cfg(unix)]
2358    fn test_url_to_unix_connection_info() {
2359        let cases = vec![
2360            (
2361                url::Url::parse("unix:///var/run/redis.sock").unwrap(),
2362                ConnectionInfo {
2363                    addr: ConnectionAddr::Unix("/var/run/redis.sock".into()),
2364                    redis: RedisConnectionInfo {
2365                        db: 0,
2366                        username: None,
2367                        password: None,
2368                        protocol: ProtocolVersion::RESP2,
2369                    },
2370                },
2371            ),
2372            (
2373                url::Url::parse("redis+unix:///var/run/redis.sock?db=1").unwrap(),
2374                ConnectionInfo {
2375                    addr: ConnectionAddr::Unix("/var/run/redis.sock".into()),
2376                    redis: RedisConnectionInfo {
2377                        db: 1,
2378                        ..Default::default()
2379                    },
2380                },
2381            ),
2382            (
2383                url::Url::parse(
2384                    "unix:///example.sock?user=%25johndoe%25&pass=%23%40%3C%3E%24&db=2",
2385                )
2386                .unwrap(),
2387                ConnectionInfo {
2388                    addr: ConnectionAddr::Unix("/example.sock".into()),
2389                    redis: RedisConnectionInfo {
2390                        db: 2,
2391                        username: Some("%johndoe%".to_string()),
2392                        password: Some("#@<>$".to_string()),
2393                        ..Default::default()
2394                    },
2395                },
2396            ),
2397            (
2398                url::Url::parse(
2399                    "redis+unix:///example.sock?pass=%26%3F%3D+%2A%2B&db=2&user=%25johndoe%25",
2400                )
2401                .unwrap(),
2402                ConnectionInfo {
2403                    addr: ConnectionAddr::Unix("/example.sock".into()),
2404                    redis: RedisConnectionInfo {
2405                        db: 2,
2406                        username: Some("%johndoe%".to_string()),
2407                        password: Some("&?= *+".to_string()),
2408                        ..Default::default()
2409                    },
2410                },
2411            ),
2412            (
2413                url::Url::parse("redis+unix:///var/run/redis.sock?protocol=3").unwrap(),
2414                ConnectionInfo {
2415                    addr: ConnectionAddr::Unix("/var/run/redis.sock".into()),
2416                    redis: RedisConnectionInfo {
2417                        protocol: ProtocolVersion::RESP3,
2418                        ..Default::default()
2419                    },
2420                },
2421            ),
2422        ];
2423        for (url, expected) in cases.into_iter() {
2424            assert_eq!(
2425                ConnectionAddr::Unix(url.to_file_path().unwrap()),
2426                expected.addr,
2427                "addr of {url} is not expected",
2428            );
2429            let res = url_to_unix_connection_info(url.clone()).unwrap();
2430            assert_eq!(res.addr, expected.addr, "addr of {url} is not expected");
2431            assert_eq!(
2432                res.redis.db, expected.redis.db,
2433                "db of {url} is not expected",
2434            );
2435            assert_eq!(
2436                res.redis.username, expected.redis.username,
2437                "username of {url} is not expected",
2438            );
2439            assert_eq!(
2440                res.redis.password, expected.redis.password,
2441                "password of {url} is not expected",
2442            );
2443        }
2444    }
2445}