redis/aio/
connection.rs

1#![allow(deprecated)]
2
3#[cfg(feature = "async-std-comp")]
4use super::async_std;
5use super::{setup_connection, AsyncStream, DefaultAsyncDNSResolver, RedisRuntime};
6use super::{AsyncDNSResolver, ConnectionLike};
7use crate::cmd::{cmd, Cmd};
8use crate::connection::{
9    resp2_is_pub_sub_state_cleared, resp3_is_pub_sub_state_cleared, ConnectionAddr, ConnectionInfo,
10    Msg, RedisConnectionInfo,
11};
12use crate::io::tcp::TcpSettings;
13#[cfg(any(feature = "tokio-comp", feature = "async-std-comp"))]
14use crate::parser::ValueCodec;
15use crate::types::{FromRedisValue, RedisFuture, RedisResult, Value};
16use crate::{from_owned_redis_value, ProtocolVersion, ToRedisArgs};
17use ::tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
18use combine::{parser::combinator::AnySendSyncPartialState, stream::PointerOffset};
19use futures_util::future::select_ok;
20use futures_util::{
21    future::FutureExt,
22    stream::{Stream, StreamExt},
23};
24use std::pin::Pin;
25#[cfg(any(feature = "tokio-comp", feature = "async-std-comp"))]
26use tokio_util::codec::Decoder;
27
28/// Represents a stateful redis TCP connection.
29#[deprecated(note = "aio::Connection is deprecated. Use aio::MultiplexedConnection instead.")]
30pub struct Connection<C = Pin<Box<dyn AsyncStream + Send + Sync>>> {
31    con: C,
32    buf: Vec<u8>,
33    decoder: combine::stream::Decoder<AnySendSyncPartialState, PointerOffset<[u8]>>,
34    db: i64,
35
36    // Flag indicating whether the connection was left in the PubSub state after dropping `PubSub`.
37    //
38    // This flag is checked when attempting to send a command, and if it's raised, we attempt to
39    // exit the pubsub state before executing the new request.
40    pubsub: bool,
41
42    // Field indicating which protocol to use for server communications.
43    protocol: ProtocolVersion,
44}
45
46fn assert_sync<T: Sync>() {}
47
48#[allow(unused)]
49fn test() {
50    assert_sync::<Connection>();
51}
52
53impl<C> Connection<C> {
54    pub(crate) fn map<D>(self, f: impl FnOnce(C) -> D) -> Connection<D> {
55        let Self {
56            con,
57            buf,
58            decoder,
59            db,
60            pubsub,
61            protocol,
62        } = self;
63        Connection {
64            con: f(con),
65            buf,
66            decoder,
67            db,
68            pubsub,
69            protocol,
70        }
71    }
72}
73
74impl<C> Connection<C>
75where
76    C: Unpin + AsyncRead + AsyncWrite + Send,
77{
78    /// Constructs a new `Connection` out of a `AsyncRead + AsyncWrite` object
79    /// and a `RedisConnectionInfo`
80    pub async fn new(connection_info: &RedisConnectionInfo, con: C) -> RedisResult<Self> {
81        let mut rv = Connection {
82            con,
83            buf: Vec::new(),
84            decoder: combine::stream::Decoder::new(),
85            db: connection_info.db,
86            pubsub: false,
87            protocol: connection_info.protocol,
88        };
89        setup_connection(
90            connection_info,
91            &mut rv,
92            #[cfg(feature = "cache-aio")]
93            None,
94        )
95        .await?;
96        Ok(rv)
97    }
98
99    /// Converts this [`Connection`] into [`PubSub`].
100    #[deprecated(note = "aio::Connection is deprecated. Use [Client::get_async_pubsub] instead")]
101    pub fn into_pubsub(self) -> PubSub<C> {
102        PubSub::new(self)
103    }
104
105    /// Converts this [`Connection`] into [`Monitor`]
106    #[deprecated(note = "aio::Connection is deprecated. Use [Client::get_async_pubsub] instead")]
107    pub fn into_monitor(self) -> Monitor<C> {
108        Monitor::new(self)
109    }
110
111    /// Fetches a single response from the connection.
112    async fn read_response(&mut self) -> RedisResult<Value> {
113        crate::parser::parse_redis_value_async(&mut self.decoder, &mut self.con).await
114    }
115
116    /// Brings [`Connection`] out of `PubSub` mode.
117    ///
118    /// This will unsubscribe this [`Connection`] from all subscriptions.
119    ///
120    /// If this function returns error then on all command send tries will be performed attempt
121    /// to exit from `PubSub` mode until it will be successful.
122    async fn exit_pubsub(&mut self) -> RedisResult<()> {
123        let res = self.clear_active_subscriptions().await;
124        if res.is_ok() {
125            self.pubsub = false;
126        } else {
127            // Raise the pubsub flag to indicate the connection is "stuck" in that state.
128            self.pubsub = true;
129        }
130
131        res
132    }
133
134    /// Get the inner connection out of a PubSub
135    ///
136    /// Any active subscriptions are unsubscribed. In the event of an error, the connection is
137    /// dropped.
138    async fn clear_active_subscriptions(&mut self) -> RedisResult<()> {
139        // Responses to unsubscribe commands return in a 3-tuple with values
140        // ("unsubscribe" or "punsubscribe", name of subscription removed, count of remaining subs).
141        // The "count of remaining subs" includes both pattern subscriptions and non pattern
142        // subscriptions. Thus, to accurately drain all unsubscribe messages received from the
143        // server, both commands need to be executed at once.
144        {
145            // Prepare both unsubscribe commands
146            let unsubscribe = crate::Pipeline::new()
147                .add_command(cmd("UNSUBSCRIBE"))
148                .add_command(cmd("PUNSUBSCRIBE"))
149                .get_packed_pipeline();
150
151            // Execute commands
152            self.con.write_all(&unsubscribe).await?;
153        }
154
155        // Receive responses
156        //
157        // There will be at minimum two responses - 1 for each of punsubscribe and unsubscribe
158        // commands. There may be more responses if there are active subscriptions. In this case,
159        // messages are received until the _subscription count_ in the responses reach zero.
160        let mut received_unsub = false;
161        let mut received_punsub = false;
162        if self.protocol != ProtocolVersion::RESP2 {
163            while let Value::Push { kind, data } =
164                from_owned_redis_value(self.read_response().await?)?
165            {
166                if data.len() >= 2 {
167                    if let Value::Int(num) = data[1] {
168                        if resp3_is_pub_sub_state_cleared(
169                            &mut received_unsub,
170                            &mut received_punsub,
171                            &kind,
172                            num as isize,
173                        ) {
174                            break;
175                        }
176                    }
177                }
178            }
179        } else {
180            loop {
181                let res: (Vec<u8>, (), isize) =
182                    from_owned_redis_value(self.read_response().await?)?;
183                if resp2_is_pub_sub_state_cleared(
184                    &mut received_unsub,
185                    &mut received_punsub,
186                    &res.0,
187                    res.2,
188                ) {
189                    break;
190                }
191            }
192        }
193
194        // Finally, the connection is back in its normal state since all subscriptions were
195        // cancelled *and* all unsubscribe messages were received.
196        Ok(())
197    }
198}
199
200#[cfg(feature = "async-std-comp")]
201#[cfg_attr(docsrs, doc(cfg(feature = "async-std-comp")))]
202impl<C> Connection<async_std::AsyncStdWrapped<C>>
203where
204    C: Unpin + ::async_std::io::Read + ::async_std::io::Write + Send,
205{
206    /// Constructs a new `Connection` out of a `async_std::io::AsyncRead + async_std::io::AsyncWrite` object
207    /// and a `RedisConnectionInfo`
208    pub async fn new_async_std(connection_info: &RedisConnectionInfo, con: C) -> RedisResult<Self> {
209        Connection::new(connection_info, async_std::AsyncStdWrapped::new(con)).await
210    }
211}
212
213pub(crate) async fn connect<C>(connection_info: &ConnectionInfo) -> RedisResult<Connection<C>>
214where
215    C: Unpin + RedisRuntime + AsyncRead + AsyncWrite + Send,
216{
217    let con = connect_simple::<C>(
218        connection_info,
219        &DefaultAsyncDNSResolver,
220        &TcpSettings::default(),
221    )
222    .await?;
223    Connection::new(&connection_info.redis, con).await
224}
225
226impl<C> ConnectionLike for Connection<C>
227where
228    C: Unpin + AsyncRead + AsyncWrite + Send,
229{
230    fn req_packed_command<'a>(&'a mut self, cmd: &'a Cmd) -> RedisFuture<'a, Value> {
231        (async move {
232            if self.pubsub {
233                self.exit_pubsub().await?;
234            }
235            self.buf.clear();
236            cmd.write_packed_command(&mut self.buf);
237            self.con.write_all(&self.buf).await?;
238            if cmd.is_no_response() {
239                return Ok(Value::Nil);
240            }
241            loop {
242                match self.read_response().await? {
243                    Value::Push { .. } => continue,
244                    val => return Ok(val),
245                }
246            }
247        })
248        .boxed()
249    }
250
251    fn req_packed_commands<'a>(
252        &'a mut self,
253        cmd: &'a crate::Pipeline,
254        offset: usize,
255        count: usize,
256    ) -> RedisFuture<'a, Vec<Value>> {
257        (async move {
258            if self.pubsub {
259                self.exit_pubsub().await?;
260            }
261
262            self.buf.clear();
263            cmd.write_packed_pipeline(&mut self.buf);
264            self.con.write_all(&self.buf).await?;
265
266            let mut first_err = None;
267
268            for _ in 0..offset {
269                let response = self.read_response().await;
270                match response {
271                    Ok(Value::ServerError(err)) => {
272                        if first_err.is_none() {
273                            first_err = Some(err.into());
274                        }
275                    }
276                    Err(err) => {
277                        if first_err.is_none() {
278                            first_err = Some(err);
279                        }
280                    }
281                    _ => {}
282                }
283            }
284
285            let mut rv = Vec::with_capacity(count);
286            let mut count = count;
287            let mut idx = 0;
288            while idx < count {
289                let response = self.read_response().await;
290                match response {
291                    Ok(item) => {
292                        // RESP3 can insert push data between command replies
293                        if let Value::Push { .. } = item {
294                            // if that is the case we have to extend the loop and handle push data
295                            count += 1;
296                        } else {
297                            rv.push(item);
298                        }
299                    }
300                    Err(err) => {
301                        if first_err.is_none() {
302                            first_err = Some(err);
303                        }
304                    }
305                }
306                idx += 1;
307            }
308
309            if let Some(err) = first_err {
310                Err(err)
311            } else {
312                Ok(rv)
313            }
314        })
315        .boxed()
316    }
317
318    fn get_db(&self) -> i64 {
319        self.db
320    }
321}
322
323/// Represents a `PubSub` connection.
324pub struct PubSub<C = Pin<Box<dyn AsyncStream + Send + Sync>>>(Connection<C>);
325
326/// Represents a `Monitor` connection.
327pub struct Monitor<C = Pin<Box<dyn AsyncStream + Send + Sync>>>(Connection<C>);
328
329impl<C> PubSub<C>
330where
331    C: Unpin + AsyncRead + AsyncWrite + Send,
332{
333    fn new(con: Connection<C>) -> Self {
334        Self(con)
335    }
336
337    /// Subscribes to a new channel(s).
338    pub async fn subscribe<T: ToRedisArgs>(&mut self, channel: T) -> RedisResult<()> {
339        let mut cmd = cmd("SUBSCRIBE");
340        cmd.arg(channel);
341        if self.0.protocol != ProtocolVersion::RESP2 {
342            cmd.set_no_response(true);
343        }
344        cmd.query_async(&mut self.0).await
345    }
346
347    /// Subscribes to new channel(s) with pattern(s).
348    pub async fn psubscribe<T: ToRedisArgs>(&mut self, pchannel: T) -> RedisResult<()> {
349        let mut cmd = cmd("PSUBSCRIBE");
350        cmd.arg(pchannel);
351        if self.0.protocol != ProtocolVersion::RESP2 {
352            cmd.set_no_response(true);
353        }
354        cmd.query_async(&mut self.0).await
355    }
356
357    /// Unsubscribes from a channel.
358    pub async fn unsubscribe<T: ToRedisArgs>(&mut self, channel: T) -> RedisResult<()> {
359        let mut cmd = cmd("UNSUBSCRIBE");
360        cmd.arg(channel);
361        if self.0.protocol != ProtocolVersion::RESP2 {
362            cmd.set_no_response(true);
363        }
364        cmd.query_async(&mut self.0).await
365    }
366
367    /// Unsubscribes from channel pattern(s).
368    pub async fn punsubscribe<T: ToRedisArgs>(&mut self, pchannel: T) -> RedisResult<()> {
369        let mut cmd = cmd("PUNSUBSCRIBE");
370        cmd.arg(pchannel);
371        if self.0.protocol != ProtocolVersion::RESP2 {
372            cmd.set_no_response(true);
373        }
374        cmd.query_async(&mut self.0).await
375    }
376
377    /// Returns [`Stream`] of [`Msg`]s from this [`PubSub`]s subscriptions.
378    ///
379    /// The message itself is still generic and can be converted into an appropriate type through
380    /// the helper methods on it.
381    pub fn on_message(&mut self) -> impl Stream<Item = Msg> + '_ {
382        ValueCodec::default()
383            .framed(&mut self.0.con)
384            .filter_map(|msg| Box::pin(async move { Msg::from_owned_value(msg.ok()?) }))
385    }
386
387    /// Returns [`Stream`] of [`Msg`]s from this [`PubSub`]s subscriptions consuming it.
388    ///
389    /// The message itself is still generic and can be converted into an appropriate type through
390    /// the helper methods on it.
391    /// This can be useful in cases where the stream needs to be returned or held by something other
392    /// than the [`PubSub`].
393    pub fn into_on_message(self) -> impl Stream<Item = Msg> {
394        ValueCodec::default()
395            .framed(self.0.con)
396            .filter_map(|msg| Box::pin(async move { Msg::from_owned_value(msg.ok()?) }))
397    }
398
399    /// Exits from `PubSub` mode and converts [`PubSub`] into [`Connection`].
400    #[deprecated(note = "aio::Connection is deprecated")]
401    pub async fn into_connection(mut self) -> Connection<C> {
402        self.0.exit_pubsub().await.ok();
403
404        self.0
405    }
406}
407
408impl<C> Monitor<C>
409where
410    C: Unpin + AsyncRead + AsyncWrite + Send,
411{
412    /// Create a [`Monitor`] from a [`Connection`]
413    pub fn new(con: Connection<C>) -> Self {
414        Self(con)
415    }
416
417    /// Deliver the MONITOR command to this [`Monitor`]ing wrapper.
418    pub async fn monitor(&mut self) -> RedisResult<()> {
419        cmd("MONITOR").query_async(&mut self.0).await
420    }
421
422    /// Returns [`Stream`] of [`FromRedisValue`] values from this [`Monitor`]ing connection
423    pub fn on_message<T: FromRedisValue>(&mut self) -> impl Stream<Item = T> + '_ {
424        ValueCodec::default()
425            .framed(&mut self.0.con)
426            .filter_map(|value| {
427                Box::pin(async move { T::from_owned_redis_value(value.ok()?).ok() })
428            })
429    }
430
431    /// Returns [`Stream`] of [`FromRedisValue`] values from this [`Monitor`]ing connection
432    pub fn into_on_message<T: FromRedisValue>(self) -> impl Stream<Item = T> {
433        ValueCodec::default()
434            .framed(self.0.con)
435            .filter_map(|value| {
436                Box::pin(async move { T::from_owned_redis_value(value.ok()?).ok() })
437            })
438    }
439}
440
441pub(crate) async fn connect_simple<T: RedisRuntime>(
442    connection_info: &ConnectionInfo,
443    dns_resolver: &dyn AsyncDNSResolver,
444    tcp_settings: &TcpSettings,
445) -> RedisResult<T> {
446    Ok(match connection_info.addr {
447        ConnectionAddr::Tcp(ref host, port) => {
448            let socket_addrs = dns_resolver.resolve(host, port).await?;
449            select_ok(socket_addrs.map(|addr| Box::pin(<T>::connect_tcp(addr, tcp_settings))))
450                .await?
451                .0
452        }
453
454        #[cfg(any(feature = "tls-native-tls", feature = "tls-rustls"))]
455        ConnectionAddr::TcpTls {
456            ref host,
457            port,
458            insecure,
459            ref tls_params,
460        } => {
461            let socket_addrs = dns_resolver.resolve(host, port).await?;
462            select_ok(socket_addrs.map(|socket_addr| {
463                Box::pin(<T>::connect_tcp_tls(
464                    host,
465                    socket_addr,
466                    insecure,
467                    tls_params,
468                    tcp_settings,
469                ))
470            }))
471            .await?
472            .0
473        }
474
475        #[cfg(not(any(feature = "tls-native-tls", feature = "tls-rustls")))]
476        ConnectionAddr::TcpTls { .. } => {
477            fail!((
478                crate::types::ErrorKind::InvalidClientConfig,
479                "Cannot connect to TCP with TLS without the tls feature"
480            ));
481        }
482
483        #[cfg(unix)]
484        ConnectionAddr::Unix(ref path) => <T>::connect_unix(path).await?,
485
486        #[cfg(not(unix))]
487        ConnectionAddr::Unix(_) => {
488            fail!((
489                crate::types::ErrorKind::InvalidClientConfig,
490                "Cannot connect to unix sockets \
491                 on this platform",
492            ))
493        }
494    })
495}