1use crate::cmd::Cmd;
3use crate::connection::{
4 check_connection_setup, connection_setup_pipeline, AuthResult, ConnectionSetupComponents,
5 RedisConnectionInfo,
6};
7use crate::io::AsyncDNSResolver;
8use crate::types::{RedisFuture, RedisResult, Value};
9use crate::{ErrorKind, PushInfo, RedisError};
10use ::tokio::io::{AsyncRead, AsyncWrite};
11use futures_util::Future;
12use std::net::SocketAddr;
13#[cfg(unix)]
14use std::path::Path;
15use std::pin::Pin;
16
17#[cfg(all(not(feature = "tokio-comp"), feature = "async-std-comp"))]
18use ::async_std::net::ToSocketAddrs;
19
20#[cfg(feature = "async-std-comp")]
22#[cfg_attr(docsrs, doc(cfg(feature = "async-std-comp")))]
23pub mod async_std;
24
25#[cfg(any(feature = "tls-rustls", feature = "tls-native-tls"))]
26use crate::connection::TlsConnParams;
27
28#[cfg(feature = "tokio-comp")]
30#[cfg_attr(docsrs, doc(cfg(feature = "tokio-comp")))]
31pub mod tokio;
32
33mod pubsub;
34pub use pubsub::{PubSub, PubSubSink, PubSubStream};
35
36pub(crate) trait RedisRuntime: AsyncStream + Send + Sync + Sized + 'static {
38 async fn connect_tcp(
40 socket_addr: SocketAddr,
41 tcp_settings: &crate::io::tcp::TcpSettings,
42 ) -> RedisResult<Self>;
43
44 #[cfg(any(feature = "tls-native-tls", feature = "tls-rustls"))]
46 async fn connect_tcp_tls(
47 hostname: &str,
48 socket_addr: SocketAddr,
49 insecure: bool,
50 tls_params: &Option<TlsConnParams>,
51 tcp_settings: &crate::io::tcp::TcpSettings,
52 ) -> RedisResult<Self>;
53
54 #[cfg(unix)]
56 async fn connect_unix(path: &Path) -> RedisResult<Self>;
57
58 fn spawn(f: impl Future<Output = ()> + Send + 'static) -> TaskHandle;
59
60 fn boxed(self) -> Pin<Box<dyn AsyncStream + Send + Sync>> {
61 Box::pin(self)
62 }
63}
64
65pub trait AsyncStream: AsyncRead + AsyncWrite {}
67impl<S> AsyncStream for S where S: AsyncRead + AsyncWrite {}
68
69pub trait ConnectionLike {
71 fn req_packed_command<'a>(&'a mut self, cmd: &'a Cmd) -> RedisFuture<'a, Value>;
74
75 #[doc(hidden)]
83 fn req_packed_commands<'a>(
84 &'a mut self,
85 cmd: &'a crate::Pipeline,
86 offset: usize,
87 count: usize,
88 ) -> RedisFuture<'a, Vec<Value>>;
89
90 fn get_db(&self) -> i64;
95}
96
97async fn execute_connection_pipeline(
98 rv: &mut impl ConnectionLike,
99 (pipeline, instructions): (crate::Pipeline, ConnectionSetupComponents),
100) -> RedisResult<AuthResult> {
101 if pipeline.is_empty() {
102 return Ok(AuthResult::Succeeded);
103 }
104
105 let results = rv.req_packed_commands(&pipeline, 0, pipeline.len()).await?;
106
107 check_connection_setup(results, instructions)
108}
109
110async fn setup_connection(
112 connection_info: &RedisConnectionInfo,
113 con: &mut impl ConnectionLike,
114 #[cfg(feature = "cache-aio")] cache_config: Option<crate::caching::CacheConfig>,
115) -> RedisResult<()> {
116 if execute_connection_pipeline(
117 con,
118 connection_setup_pipeline(
119 connection_info,
120 true,
121 #[cfg(feature = "cache-aio")]
122 cache_config,
123 ),
124 )
125 .await?
126 == AuthResult::ShouldRetryWithoutUsername
127 {
128 execute_connection_pipeline(
129 con,
130 connection_setup_pipeline(
131 connection_info,
132 false,
133 #[cfg(feature = "cache-aio")]
134 cache_config,
135 ),
136 )
137 .await?;
138 }
139
140 Ok(())
141}
142
143mod connection;
144pub use connection::*;
145mod multiplexed_connection;
146pub use multiplexed_connection::*;
147#[cfg(feature = "connection-manager")]
148mod connection_manager;
149#[cfg(feature = "connection-manager")]
150#[cfg_attr(docsrs, doc(cfg(feature = "connection-manager")))]
151pub use connection_manager::*;
152mod runtime;
153pub(super) use runtime::*;
154
155macro_rules! check_resp3 {
156 ($protocol: expr) => {
157 use crate::types::ProtocolVersion;
158 if $protocol == ProtocolVersion::RESP2 {
159 return Err(RedisError::from((
160 crate::ErrorKind::InvalidClientConfig,
161 "RESP3 is required for this command",
162 )));
163 }
164 };
165
166 ($protocol: expr, $message: expr) => {
167 use crate::types::ProtocolVersion;
168 if $protocol == ProtocolVersion::RESP2 {
169 return Err(RedisError::from((
170 crate::ErrorKind::InvalidClientConfig,
171 $message,
172 )));
173 }
174 };
175}
176
177pub(crate) use check_resp3;
178
179pub struct SendError;
181
182pub trait AsyncPushSender: Send + Sync + 'static {
185 fn send(&self, info: PushInfo) -> Result<(), SendError>;
188}
189
190impl AsyncPushSender for ::tokio::sync::mpsc::UnboundedSender<PushInfo> {
191 fn send(&self, info: PushInfo) -> Result<(), SendError> {
192 match self.send(info) {
193 Ok(_) => Ok(()),
194 Err(_) => Err(SendError),
195 }
196 }
197}
198
199impl AsyncPushSender for ::tokio::sync::broadcast::Sender<PushInfo> {
200 fn send(&self, info: PushInfo) -> Result<(), SendError> {
201 match self.send(info) {
202 Ok(_) => Ok(()),
203 Err(_) => Err(SendError),
204 }
205 }
206}
207
208impl<T, Func: Fn(PushInfo) -> Result<(), T> + Send + Sync + 'static> AsyncPushSender for Func {
209 fn send(&self, info: PushInfo) -> Result<(), SendError> {
210 match self(info) {
211 Ok(_) => Ok(()),
212 Err(_) => Err(SendError),
213 }
214 }
215}
216
217impl AsyncPushSender for std::sync::mpsc::Sender<PushInfo> {
218 fn send(&self, info: PushInfo) -> Result<(), SendError> {
219 match self.send(info) {
220 Ok(_) => Ok(()),
221 Err(_) => Err(SendError),
222 }
223 }
224}
225
226impl<T> AsyncPushSender for std::sync::Arc<T>
227where
228 T: AsyncPushSender,
229{
230 fn send(&self, info: PushInfo) -> Result<(), SendError> {
231 self.as_ref().send(info)
232 }
233}
234
235#[derive(Clone)]
237pub(crate) struct DefaultAsyncDNSResolver;
238
239impl AsyncDNSResolver for DefaultAsyncDNSResolver {
240 fn resolve<'a, 'b: 'a>(
241 &'a self,
242 host: &'b str,
243 port: u16,
244 ) -> RedisFuture<'a, Box<dyn Iterator<Item = SocketAddr> + Send + 'a>> {
245 Box::pin(get_socket_addrs(host, port))
246 }
247}
248
249async fn get_socket_addrs(
250 host: &str,
251 port: u16,
252) -> RedisResult<Box<dyn Iterator<Item = SocketAddr> + Send + '_>> {
253 #[cfg(feature = "tokio-comp")]
254 let socket_addrs = ::tokio::net::lookup_host((host, port)).await?;
255 #[cfg(all(not(feature = "tokio-comp"), feature = "async-std-comp"))]
256 let socket_addrs = (host, port).to_socket_addrs().await?;
257
258 let mut socket_addrs = socket_addrs.peekable();
259 match socket_addrs.peek() {
260 Some(_) => Ok(Box::new(socket_addrs)),
261 None => Err(RedisError::from((
262 ErrorKind::InvalidClientConfig,
263 "No address found for host",
264 ))),
265 }
266}