1#![allow(deprecated)]
2
3#[cfg(feature = "async-std-comp")]
4use super::async_std;
5use super::{setup_connection, AsyncStream, DefaultAsyncDNSResolver, RedisRuntime};
6use super::{AsyncDNSResolver, ConnectionLike};
7use crate::cmd::{cmd, Cmd};
8use crate::connection::{
9 resp2_is_pub_sub_state_cleared, resp3_is_pub_sub_state_cleared, ConnectionAddr, ConnectionInfo,
10 Msg, RedisConnectionInfo,
11};
12use crate::io::tcp::TcpSettings;
13#[cfg(any(feature = "tokio-comp", feature = "async-std-comp"))]
14use crate::parser::ValueCodec;
15use crate::types::{FromRedisValue, RedisFuture, RedisResult, Value};
16use crate::{from_owned_redis_value, ProtocolVersion, ToRedisArgs};
17use ::tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
18use combine::{parser::combinator::AnySendSyncPartialState, stream::PointerOffset};
19use futures_util::future::select_ok;
20use futures_util::{
21 future::FutureExt,
22 stream::{Stream, StreamExt},
23};
24use std::pin::Pin;
25#[cfg(any(feature = "tokio-comp", feature = "async-std-comp"))]
26use tokio_util::codec::Decoder;
27
28#[deprecated(note = "aio::Connection is deprecated. Use aio::MultiplexedConnection instead.")]
30pub struct Connection<C = Pin<Box<dyn AsyncStream + Send + Sync>>> {
31 con: C,
32 buf: Vec<u8>,
33 decoder: combine::stream::Decoder<AnySendSyncPartialState, PointerOffset<[u8]>>,
34 db: i64,
35
36 pubsub: bool,
41
42 protocol: ProtocolVersion,
44}
45
46fn assert_sync<T: Sync>() {}
47
48#[allow(unused)]
49fn test() {
50 assert_sync::<Connection>();
51}
52
53impl<C> Connection<C> {
54 pub(crate) fn map<D>(self, f: impl FnOnce(C) -> D) -> Connection<D> {
55 let Self {
56 con,
57 buf,
58 decoder,
59 db,
60 pubsub,
61 protocol,
62 } = self;
63 Connection {
64 con: f(con),
65 buf,
66 decoder,
67 db,
68 pubsub,
69 protocol,
70 }
71 }
72}
73
74impl<C> Connection<C>
75where
76 C: Unpin + AsyncRead + AsyncWrite + Send,
77{
78 pub async fn new(connection_info: &RedisConnectionInfo, con: C) -> RedisResult<Self> {
81 let mut rv = Connection {
82 con,
83 buf: Vec::new(),
84 decoder: combine::stream::Decoder::new(),
85 db: connection_info.db,
86 pubsub: false,
87 protocol: connection_info.protocol,
88 };
89 setup_connection(
90 connection_info,
91 &mut rv,
92 #[cfg(feature = "cache-aio")]
93 None,
94 )
95 .await?;
96 Ok(rv)
97 }
98
99 #[deprecated(note = "aio::Connection is deprecated. Use [Client::get_async_pubsub] instead")]
101 pub fn into_pubsub(self) -> PubSub<C> {
102 PubSub::new(self)
103 }
104
105 #[deprecated(note = "aio::Connection is deprecated. Use [Client::get_async_pubsub] instead")]
107 pub fn into_monitor(self) -> Monitor<C> {
108 Monitor::new(self)
109 }
110
111 async fn read_response(&mut self) -> RedisResult<Value> {
113 crate::parser::parse_redis_value_async(&mut self.decoder, &mut self.con).await
114 }
115
116 async fn exit_pubsub(&mut self) -> RedisResult<()> {
123 let res = self.clear_active_subscriptions().await;
124 if res.is_ok() {
125 self.pubsub = false;
126 } else {
127 self.pubsub = true;
129 }
130
131 res
132 }
133
134 async fn clear_active_subscriptions(&mut self) -> RedisResult<()> {
139 {
145 let unsubscribe = crate::Pipeline::new()
147 .add_command(cmd("UNSUBSCRIBE"))
148 .add_command(cmd("PUNSUBSCRIBE"))
149 .get_packed_pipeline();
150
151 self.con.write_all(&unsubscribe).await?;
153 }
154
155 let mut received_unsub = false;
161 let mut received_punsub = false;
162 if self.protocol != ProtocolVersion::RESP2 {
163 while let Value::Push { kind, data } =
164 from_owned_redis_value(self.read_response().await?)?
165 {
166 if data.len() >= 2 {
167 if let Value::Int(num) = data[1] {
168 if resp3_is_pub_sub_state_cleared(
169 &mut received_unsub,
170 &mut received_punsub,
171 &kind,
172 num as isize,
173 ) {
174 break;
175 }
176 }
177 }
178 }
179 } else {
180 loop {
181 let res: (Vec<u8>, (), isize) =
182 from_owned_redis_value(self.read_response().await?)?;
183 if resp2_is_pub_sub_state_cleared(
184 &mut received_unsub,
185 &mut received_punsub,
186 &res.0,
187 res.2,
188 ) {
189 break;
190 }
191 }
192 }
193
194 Ok(())
197 }
198}
199
200#[cfg(feature = "async-std-comp")]
201#[cfg_attr(docsrs, doc(cfg(feature = "async-std-comp")))]
202impl<C> Connection<async_std::AsyncStdWrapped<C>>
203where
204 C: Unpin + ::async_std::io::Read + ::async_std::io::Write + Send,
205{
206 pub async fn new_async_std(connection_info: &RedisConnectionInfo, con: C) -> RedisResult<Self> {
209 Connection::new(connection_info, async_std::AsyncStdWrapped::new(con)).await
210 }
211}
212
213pub(crate) async fn connect<C>(connection_info: &ConnectionInfo) -> RedisResult<Connection<C>>
214where
215 C: Unpin + RedisRuntime + AsyncRead + AsyncWrite + Send,
216{
217 let con = connect_simple::<C>(
218 connection_info,
219 &DefaultAsyncDNSResolver,
220 &TcpSettings::default(),
221 )
222 .await?;
223 Connection::new(&connection_info.redis, con).await
224}
225
226impl<C> ConnectionLike for Connection<C>
227where
228 C: Unpin + AsyncRead + AsyncWrite + Send,
229{
230 fn req_packed_command<'a>(&'a mut self, cmd: &'a Cmd) -> RedisFuture<'a, Value> {
231 (async move {
232 if self.pubsub {
233 self.exit_pubsub().await?;
234 }
235 self.buf.clear();
236 cmd.write_packed_command(&mut self.buf);
237 self.con.write_all(&self.buf).await?;
238 if cmd.is_no_response() {
239 return Ok(Value::Nil);
240 }
241 loop {
242 match self.read_response().await? {
243 Value::Push { .. } => continue,
244 val => return Ok(val),
245 }
246 }
247 })
248 .boxed()
249 }
250
251 fn req_packed_commands<'a>(
252 &'a mut self,
253 cmd: &'a crate::Pipeline,
254 offset: usize,
255 count: usize,
256 ) -> RedisFuture<'a, Vec<Value>> {
257 (async move {
258 if self.pubsub {
259 self.exit_pubsub().await?;
260 }
261
262 self.buf.clear();
263 cmd.write_packed_pipeline(&mut self.buf);
264 self.con.write_all(&self.buf).await?;
265
266 let mut first_err = None;
267
268 for _ in 0..offset {
269 let response = self.read_response().await;
270 match response {
271 Ok(Value::ServerError(err)) => {
272 if first_err.is_none() {
273 first_err = Some(err.into());
274 }
275 }
276 Err(err) => {
277 if first_err.is_none() {
278 first_err = Some(err);
279 }
280 }
281 _ => {}
282 }
283 }
284
285 let mut rv = Vec::with_capacity(count);
286 let mut count = count;
287 let mut idx = 0;
288 while idx < count {
289 let response = self.read_response().await;
290 match response {
291 Ok(item) => {
292 if let Value::Push { .. } = item {
294 count += 1;
296 } else {
297 rv.push(item);
298 }
299 }
300 Err(err) => {
301 if first_err.is_none() {
302 first_err = Some(err);
303 }
304 }
305 }
306 idx += 1;
307 }
308
309 if let Some(err) = first_err {
310 Err(err)
311 } else {
312 Ok(rv)
313 }
314 })
315 .boxed()
316 }
317
318 fn get_db(&self) -> i64 {
319 self.db
320 }
321}
322
323pub struct PubSub<C = Pin<Box<dyn AsyncStream + Send + Sync>>>(Connection<C>);
325
326pub struct Monitor<C = Pin<Box<dyn AsyncStream + Send + Sync>>>(Connection<C>);
328
329impl<C> PubSub<C>
330where
331 C: Unpin + AsyncRead + AsyncWrite + Send,
332{
333 fn new(con: Connection<C>) -> Self {
334 Self(con)
335 }
336
337 pub async fn subscribe<T: ToRedisArgs>(&mut self, channel: T) -> RedisResult<()> {
339 let mut cmd = cmd("SUBSCRIBE");
340 cmd.arg(channel);
341 if self.0.protocol != ProtocolVersion::RESP2 {
342 cmd.set_no_response(true);
343 }
344 cmd.query_async(&mut self.0).await
345 }
346
347 pub async fn psubscribe<T: ToRedisArgs>(&mut self, pchannel: T) -> RedisResult<()> {
349 let mut cmd = cmd("PSUBSCRIBE");
350 cmd.arg(pchannel);
351 if self.0.protocol != ProtocolVersion::RESP2 {
352 cmd.set_no_response(true);
353 }
354 cmd.query_async(&mut self.0).await
355 }
356
357 pub async fn unsubscribe<T: ToRedisArgs>(&mut self, channel: T) -> RedisResult<()> {
359 let mut cmd = cmd("UNSUBSCRIBE");
360 cmd.arg(channel);
361 if self.0.protocol != ProtocolVersion::RESP2 {
362 cmd.set_no_response(true);
363 }
364 cmd.query_async(&mut self.0).await
365 }
366
367 pub async fn punsubscribe<T: ToRedisArgs>(&mut self, pchannel: T) -> RedisResult<()> {
369 let mut cmd = cmd("PUNSUBSCRIBE");
370 cmd.arg(pchannel);
371 if self.0.protocol != ProtocolVersion::RESP2 {
372 cmd.set_no_response(true);
373 }
374 cmd.query_async(&mut self.0).await
375 }
376
377 pub fn on_message(&mut self) -> impl Stream<Item = Msg> + '_ {
382 ValueCodec::default()
383 .framed(&mut self.0.con)
384 .filter_map(|msg| Box::pin(async move { Msg::from_owned_value(msg.ok()?) }))
385 }
386
387 pub fn into_on_message(self) -> impl Stream<Item = Msg> {
394 ValueCodec::default()
395 .framed(self.0.con)
396 .filter_map(|msg| Box::pin(async move { Msg::from_owned_value(msg.ok()?) }))
397 }
398
399 #[deprecated(note = "aio::Connection is deprecated")]
401 pub async fn into_connection(mut self) -> Connection<C> {
402 self.0.exit_pubsub().await.ok();
403
404 self.0
405 }
406}
407
408impl<C> Monitor<C>
409where
410 C: Unpin + AsyncRead + AsyncWrite + Send,
411{
412 pub fn new(con: Connection<C>) -> Self {
414 Self(con)
415 }
416
417 pub async fn monitor(&mut self) -> RedisResult<()> {
419 cmd("MONITOR").query_async(&mut self.0).await
420 }
421
422 pub fn on_message<T: FromRedisValue>(&mut self) -> impl Stream<Item = T> + '_ {
424 ValueCodec::default()
425 .framed(&mut self.0.con)
426 .filter_map(|value| {
427 Box::pin(async move { T::from_owned_redis_value(value.ok()?).ok() })
428 })
429 }
430
431 pub fn into_on_message<T: FromRedisValue>(self) -> impl Stream<Item = T> {
433 ValueCodec::default()
434 .framed(self.0.con)
435 .filter_map(|value| {
436 Box::pin(async move { T::from_owned_redis_value(value.ok()?).ok() })
437 })
438 }
439}
440
441pub(crate) async fn connect_simple<T: RedisRuntime>(
442 connection_info: &ConnectionInfo,
443 dns_resolver: &dyn AsyncDNSResolver,
444 tcp_settings: &TcpSettings,
445) -> RedisResult<T> {
446 Ok(match connection_info.addr {
447 ConnectionAddr::Tcp(ref host, port) => {
448 let socket_addrs = dns_resolver.resolve(host, port).await?;
449 select_ok(socket_addrs.map(|addr| Box::pin(<T>::connect_tcp(addr, tcp_settings))))
450 .await?
451 .0
452 }
453
454 #[cfg(any(feature = "tls-native-tls", feature = "tls-rustls"))]
455 ConnectionAddr::TcpTls {
456 ref host,
457 port,
458 insecure,
459 ref tls_params,
460 } => {
461 let socket_addrs = dns_resolver.resolve(host, port).await?;
462 select_ok(socket_addrs.map(|socket_addr| {
463 Box::pin(<T>::connect_tcp_tls(
464 host,
465 socket_addr,
466 insecure,
467 tls_params,
468 tcp_settings,
469 ))
470 }))
471 .await?
472 .0
473 }
474
475 #[cfg(not(any(feature = "tls-native-tls", feature = "tls-rustls")))]
476 ConnectionAddr::TcpTls { .. } => {
477 fail!((
478 crate::types::ErrorKind::InvalidClientConfig,
479 "Cannot connect to TCP with TLS without the tls feature"
480 ));
481 }
482
483 #[cfg(unix)]
484 ConnectionAddr::Unix(ref path) => <T>::connect_unix(path).await?,
485
486 #[cfg(not(unix))]
487 ConnectionAddr::Unix(_) => {
488 fail!((
489 crate::types::ErrorKind::InvalidClientConfig,
490 "Cannot connect to unix sockets \
491 on this platform",
492 ))
493 }
494 })
495}