redis/aio/
mod.rs

1//! Adds async IO support to redis.
2use crate::cmd::Cmd;
3use crate::connection::{
4    check_connection_setup, connection_setup_pipeline, AuthResult, ConnectionSetupComponents,
5    RedisConnectionInfo,
6};
7use crate::io::AsyncDNSResolver;
8use crate::types::{RedisFuture, RedisResult, Value};
9use crate::{ErrorKind, PushInfo, RedisError};
10use ::tokio::io::{AsyncRead, AsyncWrite};
11use futures_util::Future;
12use std::net::SocketAddr;
13#[cfg(unix)]
14use std::path::Path;
15use std::pin::Pin;
16
17#[cfg(all(not(feature = "tokio-comp"), feature = "async-std-comp"))]
18use ::async_std::net::ToSocketAddrs;
19
20/// Enables the async_std compatibility
21#[cfg(feature = "async-std-comp")]
22#[cfg_attr(docsrs, doc(cfg(feature = "async-std-comp")))]
23pub mod async_std;
24
25#[cfg(any(feature = "tls-rustls", feature = "tls-native-tls"))]
26use crate::connection::TlsConnParams;
27
28/// Enables the tokio compatibility
29#[cfg(feature = "tokio-comp")]
30#[cfg_attr(docsrs, doc(cfg(feature = "tokio-comp")))]
31pub mod tokio;
32
33mod pubsub;
34pub use pubsub::{PubSub, PubSubSink, PubSubStream};
35
36/// Represents the ability of connecting via TCP or via Unix socket
37pub(crate) trait RedisRuntime: AsyncStream + Send + Sync + Sized + 'static {
38    /// Performs a TCP connection
39    async fn connect_tcp(
40        socket_addr: SocketAddr,
41        tcp_settings: &crate::io::tcp::TcpSettings,
42    ) -> RedisResult<Self>;
43
44    // Performs a TCP TLS connection
45    #[cfg(any(feature = "tls-native-tls", feature = "tls-rustls"))]
46    async fn connect_tcp_tls(
47        hostname: &str,
48        socket_addr: SocketAddr,
49        insecure: bool,
50        tls_params: &Option<TlsConnParams>,
51        tcp_settings: &crate::io::tcp::TcpSettings,
52    ) -> RedisResult<Self>;
53
54    /// Performs a UNIX connection
55    #[cfg(unix)]
56    async fn connect_unix(path: &Path) -> RedisResult<Self>;
57
58    fn spawn(f: impl Future<Output = ()> + Send + 'static) -> TaskHandle;
59
60    fn boxed(self) -> Pin<Box<dyn AsyncStream + Send + Sync>> {
61        Box::pin(self)
62    }
63}
64
65/// Trait for objects that implements `AsyncRead` and `AsyncWrite`
66pub trait AsyncStream: AsyncRead + AsyncWrite {}
67impl<S> AsyncStream for S where S: AsyncRead + AsyncWrite {}
68
69/// An async abstraction over connections.
70pub trait ConnectionLike {
71    /// Sends an already encoded (packed) command into the TCP socket and
72    /// reads the single response from it.
73    fn req_packed_command<'a>(&'a mut self, cmd: &'a Cmd) -> RedisFuture<'a, Value>;
74
75    /// Sends multiple already encoded (packed) command into the TCP socket
76    /// and reads `count` responses from it.  This is used to implement
77    /// pipelining.
78    /// Important - this function is meant for internal usage, since it's
79    /// easy to pass incorrect `offset` & `count` parameters, which might
80    /// cause the connection to enter an erroneous state. Users shouldn't
81    /// call it, instead using the Pipeline::query_async function.
82    #[doc(hidden)]
83    fn req_packed_commands<'a>(
84        &'a mut self,
85        cmd: &'a crate::Pipeline,
86        offset: usize,
87        count: usize,
88    ) -> RedisFuture<'a, Vec<Value>>;
89
90    /// Returns the database this connection is bound to.  Note that this
91    /// information might be unreliable because it's initially cached and
92    /// also might be incorrect if the connection like object is not
93    /// actually connected.
94    fn get_db(&self) -> i64;
95}
96
97async fn execute_connection_pipeline(
98    rv: &mut impl ConnectionLike,
99    (pipeline, instructions): (crate::Pipeline, ConnectionSetupComponents),
100) -> RedisResult<AuthResult> {
101    if pipeline.is_empty() {
102        return Ok(AuthResult::Succeeded);
103    }
104
105    let results = rv.req_packed_commands(&pipeline, 0, pipeline.len()).await?;
106
107    check_connection_setup(results, instructions)
108}
109
110// Initial setup for every connection.
111async fn setup_connection(
112    connection_info: &RedisConnectionInfo,
113    con: &mut impl ConnectionLike,
114    #[cfg(feature = "cache-aio")] cache_config: Option<crate::caching::CacheConfig>,
115) -> RedisResult<()> {
116    if execute_connection_pipeline(
117        con,
118        connection_setup_pipeline(
119            connection_info,
120            true,
121            #[cfg(feature = "cache-aio")]
122            cache_config,
123        ),
124    )
125    .await?
126        == AuthResult::ShouldRetryWithoutUsername
127    {
128        execute_connection_pipeline(
129            con,
130            connection_setup_pipeline(
131                connection_info,
132                false,
133                #[cfg(feature = "cache-aio")]
134                cache_config,
135            ),
136        )
137        .await?;
138    }
139
140    Ok(())
141}
142
143mod connection;
144pub use connection::*;
145mod multiplexed_connection;
146pub use multiplexed_connection::*;
147#[cfg(feature = "connection-manager")]
148mod connection_manager;
149#[cfg(feature = "connection-manager")]
150#[cfg_attr(docsrs, doc(cfg(feature = "connection-manager")))]
151pub use connection_manager::*;
152mod runtime;
153pub(super) use runtime::*;
154
155macro_rules! check_resp3 {
156    ($protocol: expr) => {
157        use crate::types::ProtocolVersion;
158        if $protocol == ProtocolVersion::RESP2 {
159            return Err(RedisError::from((
160                crate::ErrorKind::InvalidClientConfig,
161                "RESP3 is required for this command",
162            )));
163        }
164    };
165
166    ($protocol: expr, $message: expr) => {
167        use crate::types::ProtocolVersion;
168        if $protocol == ProtocolVersion::RESP2 {
169            return Err(RedisError::from((
170                crate::ErrorKind::InvalidClientConfig,
171                $message,
172            )));
173        }
174    };
175}
176
177pub(crate) use check_resp3;
178
179/// An error showing that the receiver
180pub struct SendError;
181
182/// A trait for sender parts of a channel that can be used for sending push messages from async
183/// connection.
184pub trait AsyncPushSender: Send + Sync + 'static {
185    /// The sender must send without blocking, otherwise it will block the sending connection.
186    /// Should error when the receiver was closed, and pushing values on the sender is no longer viable.
187    fn send(&self, info: PushInfo) -> Result<(), SendError>;
188}
189
190impl AsyncPushSender for ::tokio::sync::mpsc::UnboundedSender<PushInfo> {
191    fn send(&self, info: PushInfo) -> Result<(), SendError> {
192        match self.send(info) {
193            Ok(_) => Ok(()),
194            Err(_) => Err(SendError),
195        }
196    }
197}
198
199impl AsyncPushSender for ::tokio::sync::broadcast::Sender<PushInfo> {
200    fn send(&self, info: PushInfo) -> Result<(), SendError> {
201        match self.send(info) {
202            Ok(_) => Ok(()),
203            Err(_) => Err(SendError),
204        }
205    }
206}
207
208impl<T, Func: Fn(PushInfo) -> Result<(), T> + Send + Sync + 'static> AsyncPushSender for Func {
209    fn send(&self, info: PushInfo) -> Result<(), SendError> {
210        match self(info) {
211            Ok(_) => Ok(()),
212            Err(_) => Err(SendError),
213        }
214    }
215}
216
217impl AsyncPushSender for std::sync::mpsc::Sender<PushInfo> {
218    fn send(&self, info: PushInfo) -> Result<(), SendError> {
219        match self.send(info) {
220            Ok(_) => Ok(()),
221            Err(_) => Err(SendError),
222        }
223    }
224}
225
226impl<T> AsyncPushSender for std::sync::Arc<T>
227where
228    T: AsyncPushSender,
229{
230    fn send(&self, info: PushInfo) -> Result<(), SendError> {
231        self.as_ref().send(info)
232    }
233}
234
235/// Default DNS resolver which uses the system's DNS resolver.
236#[derive(Clone)]
237pub(crate) struct DefaultAsyncDNSResolver;
238
239impl AsyncDNSResolver for DefaultAsyncDNSResolver {
240    fn resolve<'a, 'b: 'a>(
241        &'a self,
242        host: &'b str,
243        port: u16,
244    ) -> RedisFuture<'a, Box<dyn Iterator<Item = SocketAddr> + Send + 'a>> {
245        Box::pin(get_socket_addrs(host, port))
246    }
247}
248
249async fn get_socket_addrs(
250    host: &str,
251    port: u16,
252) -> RedisResult<Box<dyn Iterator<Item = SocketAddr> + Send + '_>> {
253    #[cfg(feature = "tokio-comp")]
254    let socket_addrs = ::tokio::net::lookup_host((host, port)).await?;
255    #[cfg(all(not(feature = "tokio-comp"), feature = "async-std-comp"))]
256    let socket_addrs = (host, port).to_socket_addrs().await?;
257
258    let mut socket_addrs = socket_addrs.peekable();
259    match socket_addrs.peek() {
260        Some(_) => Ok(Box::new(socket_addrs)),
261        None => Err(RedisError::from((
262            ErrorKind::InvalidClientConfig,
263            "No address found for host",
264        ))),
265    }
266}