1use crate::cmd::Cmd;
3use crate::connection::{
4 check_connection_setup, connection_setup_pipeline, AuthResult, ConnectionSetupComponents,
5 RedisConnectionInfo,
6};
7use crate::io::AsyncDNSResolver;
8use crate::types::{closed_connection_error, RedisFuture, RedisResult, Value};
9use crate::{ErrorKind, PushInfo, RedisError};
10use ::tokio::io::{AsyncRead, AsyncWrite};
11use futures_util::{
12 future::{Future, FutureExt},
13 sink::{Sink, SinkExt},
14 stream::{Stream, StreamExt},
15};
16pub use monitor::Monitor;
17use std::net::SocketAddr;
18#[cfg(unix)]
19use std::path::Path;
20use std::pin::Pin;
21
22mod monitor;
23
24#[cfg(feature = "async-std-comp")]
26#[cfg_attr(docsrs, doc(cfg(feature = "async-std-comp")))]
27pub mod async_std;
28
29#[cfg(any(feature = "tls-rustls", feature = "tls-native-tls"))]
30use crate::connection::TlsConnParams;
31
32#[cfg(feature = "smol-comp")]
34#[cfg_attr(docsrs, doc(cfg(feature = "smol-comp")))]
35pub mod smol;
36#[cfg(feature = "tokio-comp")]
38#[cfg_attr(docsrs, doc(cfg(feature = "tokio-comp")))]
39pub mod tokio;
40
41mod pubsub;
42pub use pubsub::{PubSub, PubSubSink, PubSubStream};
43
44pub(crate) trait RedisRuntime: AsyncStream + Send + Sync + Sized + 'static {
46 async fn connect_tcp(
48 socket_addr: SocketAddr,
49 tcp_settings: &crate::io::tcp::TcpSettings,
50 ) -> RedisResult<Self>;
51
52 #[cfg(any(feature = "tls-native-tls", feature = "tls-rustls"))]
54 async fn connect_tcp_tls(
55 hostname: &str,
56 socket_addr: SocketAddr,
57 insecure: bool,
58 tls_params: &Option<TlsConnParams>,
59 tcp_settings: &crate::io::tcp::TcpSettings,
60 ) -> RedisResult<Self>;
61
62 #[cfg(unix)]
64 async fn connect_unix(path: &Path) -> RedisResult<Self>;
65
66 fn spawn(f: impl Future<Output = ()> + Send + 'static) -> TaskHandle;
67
68 fn boxed(self) -> Pin<Box<dyn AsyncStream + Send + Sync>> {
69 Box::pin(self)
70 }
71}
72
73pub trait AsyncStream: AsyncRead + AsyncWrite {}
75impl<S> AsyncStream for S where S: AsyncRead + AsyncWrite {}
76
77pub trait ConnectionLike {
79 fn req_packed_command<'a>(&'a mut self, cmd: &'a Cmd) -> RedisFuture<'a, Value>;
82
83 #[doc(hidden)]
91 fn req_packed_commands<'a>(
92 &'a mut self,
93 cmd: &'a crate::Pipeline,
94 offset: usize,
95 count: usize,
96 ) -> RedisFuture<'a, Vec<Value>>;
97
98 fn get_db(&self) -> i64;
103}
104
105async fn execute_connection_pipeline<T>(
106 codec: &mut T,
107 (pipeline, instructions): (crate::Pipeline, ConnectionSetupComponents),
108) -> RedisResult<AuthResult>
109where
110 T: Sink<Vec<u8>, Error = RedisError>,
111 T: Stream<Item = RedisResult<Value>>,
112 T: Unpin + Send + 'static,
113{
114 let count = pipeline.len();
115 if count == 0 {
116 return Ok(AuthResult::Succeeded);
117 }
118 codec.send(pipeline.get_packed_pipeline()).await?;
119
120 let mut results = Vec::with_capacity(count);
121 for _ in 0..count {
122 let value = codec.next().await.ok_or_else(closed_connection_error)??;
123 results.push(value);
124 }
125
126 check_connection_setup(results, instructions)
127}
128
129pub(super) async fn setup_connection<T>(
130 codec: &mut T,
131 connection_info: &RedisConnectionInfo,
132 #[cfg(feature = "cache-aio")] cache_config: Option<crate::caching::CacheConfig>,
133) -> RedisResult<()>
134where
135 T: Sink<Vec<u8>, Error = RedisError>,
136 T: Stream<Item = RedisResult<Value>>,
137 T: Unpin + Send + 'static,
138{
139 if execute_connection_pipeline(
140 codec,
141 connection_setup_pipeline(
142 connection_info,
143 true,
144 #[cfg(feature = "cache-aio")]
145 cache_config,
146 ),
147 )
148 .await?
149 == AuthResult::ShouldRetryWithoutUsername
150 {
151 execute_connection_pipeline(
152 codec,
153 connection_setup_pipeline(
154 connection_info,
155 false,
156 #[cfg(feature = "cache-aio")]
157 cache_config,
158 ),
159 )
160 .await?;
161 }
162
163 Ok(())
164}
165
166mod connection;
167pub(crate) use connection::connect_simple;
168mod multiplexed_connection;
169pub use multiplexed_connection::*;
170#[cfg(feature = "connection-manager")]
171mod connection_manager;
172#[cfg(feature = "connection-manager")]
173#[cfg_attr(docsrs, doc(cfg(feature = "connection-manager")))]
174pub use connection_manager::*;
175mod runtime;
176#[cfg(all(
177 feature = "async-std-comp",
178 any(feature = "smol-comp", feature = "tokio-comp")
179))]
180pub use runtime::prefer_async_std;
181#[cfg(all(
182 feature = "smol-comp",
183 any(feature = "async-std-comp", feature = "tokio-comp")
184))]
185pub use runtime::prefer_smol;
186#[cfg(all(
187 feature = "tokio-comp",
188 any(feature = "async-std-comp", feature = "smol-comp")
189))]
190pub use runtime::prefer_tokio;
191pub(super) use runtime::*;
192
193pub struct SendError;
195
196pub trait AsyncPushSender: Send + Sync + 'static {
199 fn send(&self, info: PushInfo) -> Result<(), SendError>;
201}
202
203impl AsyncPushSender for ::tokio::sync::mpsc::UnboundedSender<PushInfo> {
204 fn send(&self, info: PushInfo) -> Result<(), SendError> {
205 match self.send(info) {
206 Ok(_) => Ok(()),
207 Err(_) => Err(SendError),
208 }
209 }
210}
211
212impl AsyncPushSender for ::tokio::sync::broadcast::Sender<PushInfo> {
213 fn send(&self, info: PushInfo) -> Result<(), SendError> {
214 match self.send(info) {
215 Ok(_) => Ok(()),
216 Err(_) => Err(SendError),
217 }
218 }
219}
220
221impl<T, Func: Fn(PushInfo) -> Result<(), T> + Send + Sync + 'static> AsyncPushSender for Func {
222 fn send(&self, info: PushInfo) -> Result<(), SendError> {
223 match self(info) {
224 Ok(_) => Ok(()),
225 Err(_) => Err(SendError),
226 }
227 }
228}
229
230impl AsyncPushSender for std::sync::mpsc::Sender<PushInfo> {
231 fn send(&self, info: PushInfo) -> Result<(), SendError> {
232 match self.send(info) {
233 Ok(_) => Ok(()),
234 Err(_) => Err(SendError),
235 }
236 }
237}
238
239impl<T> AsyncPushSender for std::sync::Arc<T>
240where
241 T: AsyncPushSender,
242{
243 fn send(&self, info: PushInfo) -> Result<(), SendError> {
244 self.as_ref().send(info)
245 }
246}
247
248#[derive(Clone)]
250pub(crate) struct DefaultAsyncDNSResolver;
251
252impl AsyncDNSResolver for DefaultAsyncDNSResolver {
253 fn resolve<'a, 'b: 'a>(
254 &'a self,
255 host: &'b str,
256 port: u16,
257 ) -> RedisFuture<'a, Box<dyn Iterator<Item = SocketAddr> + Send + 'a>> {
258 Box::pin(get_socket_addrs(host, port).map(|vec| {
259 Ok(Box::new(vec?.into_iter()) as Box<dyn Iterator<Item = SocketAddr> + Send>)
260 }))
261 }
262}
263
264async fn get_socket_addrs(host: &str, port: u16) -> RedisResult<Vec<SocketAddr>> {
265 let socket_addrs: Vec<_> = match Runtime::locate() {
266 #[cfg(feature = "tokio-comp")]
267 Runtime::Tokio => ::tokio::net::lookup_host((host, port))
268 .await
269 .map_err(RedisError::from)
270 .map(|iter| iter.collect()),
271 #[cfg(feature = "async-std-comp")]
272 Runtime::AsyncStd => Ok::<_, RedisError>(
273 ::async_std::net::ToSocketAddrs::to_socket_addrs(&(host, port))
274 .await
275 .map(|iter| iter.collect())?,
276 ),
277 #[cfg(feature = "smol-comp")]
278 Runtime::Smol => ::smol::net::resolve((host, port))
279 .await
280 .map_err(RedisError::from),
281 }?;
282
283 if socket_addrs.is_empty() {
284 Err(RedisError::from((
285 ErrorKind::InvalidClientConfig,
286 "No address found for host",
287 )))
288 } else {
289 Ok(socket_addrs)
290 }
291}