Skip to main content

redis/aio/
multiplexed_connection.rs

1use super::{AsyncPushSender, ConnectionLike, Runtime, SharedHandleContainer, TaskHandle};
2#[cfg(feature = "cache-aio")]
3use crate::caching::{CacheManager, CacheStatistics, PrepareCacheResult};
4use crate::{
5    AsyncConnectionConfig, ProtocolVersion, PushInfo, RedisConnectionInfo, ServerError,
6    ToRedisArgs,
7    aio::setup_connection,
8    check_resp3, cmd,
9    cmd::Cmd,
10    errors::{RedisError, closed_connection_error},
11    parser::ValueCodec,
12    types::{RedisFuture, RedisResult, Value},
13};
14use ::tokio::{
15    io::{AsyncRead, AsyncWrite},
16    sync::{mpsc, oneshot},
17};
18#[cfg(feature = "token-based-authentication")]
19use {
20    crate::errors::ErrorKind,
21    arcstr::ArcStr,
22    log::{debug, error},
23    std::sync::atomic::{AtomicBool, Ordering},
24};
25
26use futures_util::{
27    future::{Future, FutureExt},
28    ready,
29    sink::Sink,
30    stream::{self, Stream, StreamExt},
31};
32use pin_project_lite::pin_project;
33use std::collections::VecDeque;
34use std::fmt;
35use std::fmt::Debug;
36use std::pin::Pin;
37use std::sync::Arc;
38use std::task::{self, Poll};
39use std::time::Duration;
40use tokio_util::codec::Decoder;
41
42// Senders which the result of a single request are sent through
43type PipelineOutput = oneshot::Sender<RedisResult<Value>>;
44
45enum ErrorOrErrors {
46    Errors(Vec<(usize, ServerError)>),
47    // only set if we receive a transmission error
48    FirstError(RedisError),
49}
50
51enum ResponseAggregate {
52    SingleCommand,
53    Pipeline {
54        buffer: Vec<Value>,
55        error_or_errors: ErrorOrErrors,
56        expectation: PipelineResponseExpectation,
57    },
58}
59
60// TODO - this is a really bad name.
61struct PipelineResponseExpectation {
62    // The number of responses to skip before starting to save responses in the buffer.
63    skipped_response_count: usize,
64    // The number of responses to keep in the buffer
65    expected_response_count: usize,
66    // whether the pipelined request is a transaction
67    is_transaction: bool,
68    seen_responses: usize,
69}
70
71impl ResponseAggregate {
72    fn new(expectation: Option<PipelineResponseExpectation>) -> Self {
73        match expectation {
74            Some(expectation) => ResponseAggregate::Pipeline {
75                buffer: Vec::new(),
76                error_or_errors: ErrorOrErrors::Errors(Vec::new()),
77                expectation,
78            },
79            None => ResponseAggregate::SingleCommand,
80        }
81    }
82}
83
84struct InFlight {
85    output: Option<PipelineOutput>,
86    response_aggregate: ResponseAggregate,
87}
88
89// A single message sent through the pipeline
90struct PipelineMessage {
91    input: Vec<u8>,
92    // If `output` is None, then the caller doesn't expect to receive an answer.
93    output: Option<PipelineOutput>,
94    // If `None`, this is a single request, not a pipeline of multiple requests.
95    // If `Some`, the first value is the number of responses to skip,
96    // the second is the number of responses to keep, and the third is whether the pipeline is a transaction.
97    expectation: Option<PipelineResponseExpectation>,
98}
99
100/// Wrapper around a `Stream + Sink` where each item sent through the `Sink` results in one or more
101/// items being output by the `Stream` (the number is specified at time of sending). With the
102/// interface provided by `Pipeline` an easy interface of request to response, hiding the `Stream`
103/// and `Sink`.
104#[derive(Clone)]
105struct Pipeline {
106    sender: mpsc::Sender<PipelineMessage>,
107}
108
109impl Debug for Pipeline {
110    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
111        f.debug_tuple("Pipeline").field(&self.sender).finish()
112    }
113}
114
115#[cfg(feature = "cache-aio")]
116pin_project! {
117    struct PipelineSink<T> {
118        #[pin]
119        sink_stream: T,
120        in_flight: VecDeque<InFlight>,
121        error: Option<RedisError>,
122        push_sender: Option<Arc<dyn AsyncPushSender>>,
123        cache_manager: Option<CacheManager>,
124    }
125}
126
127#[cfg(not(feature = "cache-aio"))]
128pin_project! {
129    struct PipelineSink<T> {
130        #[pin]
131        sink_stream: T,
132        in_flight: VecDeque<InFlight>,
133        error: Option<RedisError>,
134        push_sender: Option<Arc<dyn AsyncPushSender>>,
135    }
136}
137
138fn send_push(push_sender: &Option<Arc<dyn AsyncPushSender>>, info: PushInfo) {
139    if let Some(sender) = push_sender {
140        let _ = sender.send(info);
141    };
142}
143
144pub(crate) fn send_disconnect(push_sender: &Option<Arc<dyn AsyncPushSender>>) {
145    send_push(push_sender, PushInfo::disconnect());
146}
147
148impl<T> PipelineSink<T>
149where
150    T: Stream<Item = RedisResult<Value>> + 'static,
151{
152    fn new(
153        sink_stream: T,
154        push_sender: Option<Arc<dyn AsyncPushSender>>,
155        #[cfg(feature = "cache-aio")] cache_manager: Option<CacheManager>,
156    ) -> Self
157    where
158        T: Sink<Vec<u8>, Error = RedisError> + Stream<Item = RedisResult<Value>> + 'static,
159    {
160        PipelineSink {
161            sink_stream,
162            in_flight: VecDeque::new(),
163            error: None,
164            push_sender,
165            #[cfg(feature = "cache-aio")]
166            cache_manager,
167        }
168    }
169
170    // Read messages from the stream and send them back to the caller
171    fn poll_read(mut self: Pin<&mut Self>, cx: &mut task::Context) -> Poll<Result<(), ()>> {
172        loop {
173            let item = ready!(self.as_mut().project().sink_stream.poll_next(cx));
174            let item = match item {
175                Some(result) => result,
176                // The redis response stream is not going to produce any more items so we simulate a disconnection error to break out of the loop.
177                None => Err(closed_connection_error()),
178            };
179
180            let is_unrecoverable = item.as_ref().is_err_and(|err| err.is_unrecoverable_error());
181            self.as_mut().send_result(item);
182            if is_unrecoverable {
183                let self_ = self.project();
184                send_disconnect(self_.push_sender);
185                return Poll::Ready(Err(()));
186            }
187        }
188    }
189
190    fn send_result(self: Pin<&mut Self>, result: RedisResult<Value>) {
191        let self_ = self.project();
192        let result = match result {
193            // If this push message isn't a reply, we'll pass it as-is to the push manager and stop iterating
194            Ok(Value::Push { kind, data }) if !kind.has_reply() => {
195                #[cfg(feature = "cache-aio")]
196                if let Some(cache_manager) = &self_.cache_manager {
197                    cache_manager.handle_push_value(&kind, &data);
198                }
199                send_push(self_.push_sender, PushInfo { kind, data });
200
201                return;
202            }
203            // If this push message is a reply to a query, we'll clone it to the push manager and continue with sending the reply
204            Ok(Value::Push { kind, data }) if kind.has_reply() => {
205                send_push(
206                    self_.push_sender,
207                    PushInfo {
208                        kind: kind.clone(),
209                        data: data.clone(),
210                    },
211                );
212                Ok(Value::Push { kind, data })
213            }
214            _ => result,
215        };
216
217        let mut entry = match self_.in_flight.pop_front() {
218            Some(entry) => entry,
219            None => return,
220        };
221
222        match &mut entry.response_aggregate {
223            ResponseAggregate::SingleCommand => {
224                if let Some(output) = entry.output.take() {
225                    _ = output.send(result);
226                }
227            }
228            ResponseAggregate::Pipeline {
229                buffer,
230                error_or_errors,
231                expectation:
232                    PipelineResponseExpectation {
233                        expected_response_count,
234                        skipped_response_count,
235                        is_transaction,
236                        seen_responses,
237                    },
238            } => {
239                *seen_responses += 1;
240                if *skipped_response_count > 0 {
241                    // server errors in skipped values are still counted for errors in transactions, since they're errors that will cause the transaction to fail,
242                    // and we only skip values in transaction.
243                    if *is_transaction {
244                        if let ErrorOrErrors::Errors(errs) = error_or_errors {
245                            match result {
246                                Ok(Value::ServerError(err)) => {
247                                    errs.push((*seen_responses - 2, err)); // - 1 to offset the early increment, and -1 to offset the added MULTI call.
248                                }
249                                Err(err) => *error_or_errors = ErrorOrErrors::FirstError(err),
250                                _ => {}
251                            }
252                        }
253                    }
254
255                    *skipped_response_count -= 1;
256                    self_.in_flight.push_front(entry);
257                    return;
258                }
259
260                match result {
261                    Ok(item) => {
262                        buffer.push(item);
263                    }
264                    Err(err) => {
265                        if matches!(error_or_errors, ErrorOrErrors::Errors(_)) {
266                            *error_or_errors = ErrorOrErrors::FirstError(err)
267                        }
268                    }
269                }
270
271                if buffer.len() < *expected_response_count {
272                    // Need to gather more response values
273                    self_.in_flight.push_front(entry);
274                    return;
275                }
276
277                let response =
278                    match std::mem::replace(error_or_errors, ErrorOrErrors::Errors(Vec::new())) {
279                        ErrorOrErrors::Errors(errors) => {
280                            if errors.is_empty() {
281                                Ok(Value::Array(std::mem::take(buffer)))
282                            } else {
283                                Err(RedisError::make_aborted_transaction(errors))
284                            }
285                        }
286                        ErrorOrErrors::FirstError(redis_error) => Err(redis_error),
287                    };
288
289                // `Err` means that the receiver was dropped in which case it does not
290                // care about the output and we can continue by just dropping the value
291                // and sender
292                if let Some(output) = entry.output.take() {
293                    _ = output.send(response);
294                }
295            }
296        }
297    }
298}
299
300impl<T> Sink<PipelineMessage> for PipelineSink<T>
301where
302    T: Sink<Vec<u8>, Error = RedisError> + Stream<Item = RedisResult<Value>> + 'static,
303{
304    type Error = ();
305
306    // Retrieve incoming messages and write them to the sink
307    fn poll_ready(
308        mut self: Pin<&mut Self>,
309        cx: &mut task::Context,
310    ) -> Poll<Result<(), Self::Error>> {
311        match ready!(self.as_mut().project().sink_stream.poll_ready(cx)) {
312            Ok(()) => Ok(()).into(),
313            Err(err) => {
314                *self.project().error = Some(err);
315                Ok(()).into()
316            }
317        }
318    }
319
320    fn start_send(
321        mut self: Pin<&mut Self>,
322        PipelineMessage {
323            input,
324            mut output,
325            expectation,
326        }: PipelineMessage,
327    ) -> Result<(), Self::Error> {
328        // If initially a receiver was created, but then dropped, there is nothing to receive our output we do not need to send the message as it is
329        // ambiguous whether the message will be sent anyway. Helps shed some load on the
330        // connection.
331        if output.as_ref().is_some_and(|output| output.is_closed()) {
332            return Ok(());
333        }
334
335        let self_ = self.as_mut().project();
336
337        if let Some(err) = self_.error.take() {
338            if let Some(output) = output.take() {
339                _ = output.send(Err(err));
340            }
341            return Err(());
342        }
343
344        match self_.sink_stream.start_send(input) {
345            Ok(()) => {
346                let response_aggregate = ResponseAggregate::new(expectation);
347                let entry = InFlight {
348                    output,
349                    response_aggregate,
350                };
351
352                self_.in_flight.push_back(entry);
353                Ok(())
354            }
355            Err(err) => {
356                if let Some(output) = output.take() {
357                    _ = output.send(Err(err));
358                }
359                Err(())
360            }
361        }
362    }
363
364    fn poll_flush(
365        mut self: Pin<&mut Self>,
366        cx: &mut task::Context,
367    ) -> Poll<Result<(), Self::Error>> {
368        ready!(
369            self.as_mut()
370                .project()
371                .sink_stream
372                .poll_flush(cx)
373                .map_err(|err| {
374                    self.as_mut().send_result(Err(err));
375                })
376        )?;
377        self.poll_read(cx)
378    }
379
380    fn poll_close(
381        mut self: Pin<&mut Self>,
382        cx: &mut task::Context,
383    ) -> Poll<Result<(), Self::Error>> {
384        // No new requests will come in after the first call to `close` but we need to complete any
385        // in progress requests before closing
386        if !self.in_flight.is_empty() {
387            ready!(self.as_mut().poll_flush(cx))?;
388        }
389        let this = self.as_mut().project();
390        this.sink_stream.poll_close(cx).map_err(|err| {
391            self.send_result(Err(err));
392        })
393    }
394}
395
396impl Pipeline {
397    const DEFAULT_BUFFER_SIZE: usize = 50;
398
399    fn resolve_buffer_size(size: Option<usize>) -> usize {
400        size.unwrap_or(Self::DEFAULT_BUFFER_SIZE)
401    }
402
403    fn new<T>(
404        sink_stream: T,
405        push_sender: Option<Arc<dyn AsyncPushSender>>,
406        #[cfg(feature = "cache-aio")] cache_manager: Option<CacheManager>,
407        buffer_size: usize,
408    ) -> (Self, impl Future<Output = ()>)
409    where
410        T: Sink<Vec<u8>, Error = RedisError>,
411        T: Stream<Item = RedisResult<Value>>,
412        T: Unpin + Send + 'static,
413    {
414        let (sender, mut receiver) = mpsc::channel(buffer_size);
415
416        let sink = PipelineSink::new(
417            sink_stream,
418            push_sender,
419            #[cfg(feature = "cache-aio")]
420            cache_manager,
421        );
422        let f = stream::poll_fn(move |cx| receiver.poll_recv(cx))
423            .map(Ok)
424            .forward(sink)
425            .map(|_| ());
426        (Pipeline { sender }, f)
427    }
428
429    async fn send_recv(
430        &mut self,
431        input: Vec<u8>,
432        // If `None`, this is a single request, not a pipeline of multiple requests.
433        // If `Some`, the value inside defines how the response should look like
434        expectation: Option<PipelineResponseExpectation>,
435        timeout: Option<Duration>,
436        skip_response: bool,
437    ) -> Result<Value, RedisError> {
438        if input.is_empty() {
439            return Err(RedisError::make_empty_command());
440        }
441
442        let request = async {
443            if skip_response {
444                self.sender
445                    .send(PipelineMessage {
446                        input,
447                        expectation,
448                        output: None,
449                    })
450                    .await
451                    .map_err(|_| None)?;
452
453                return Ok(Value::Nil);
454            }
455
456            let (sender, receiver) = oneshot::channel();
457
458            self.sender
459                .send(PipelineMessage {
460                    input,
461                    expectation,
462                    output: Some(sender),
463                })
464                .await
465                .map_err(|_| None)?;
466
467            receiver.await
468            // The `sender` was dropped which likely means that the stream part
469            // failed for one reason or another
470            .map_err(|_| None)
471            .and_then(|res| res.map_err(Some))
472        };
473
474        match timeout {
475            Some(timeout) => match Runtime::locate().timeout(timeout, request).await {
476                Ok(res) => res,
477                Err(elapsed) => Err(Some(elapsed.into())),
478            },
479            None => request.await,
480        }
481        .map_err(|err| err.unwrap_or_else(closed_connection_error))
482    }
483}
484
485/// A connection object which can be cloned, allowing requests to be be sent concurrently
486/// on the same underlying connection (tcp/unix socket).
487///
488/// This connection object is cancellation-safe, and the user can drop request future without polling them to completion,
489/// but this doesn't mean that the actual request sent to the server is cancelled.
490/// A side-effect of this is that the underlying connection won't be closed until all sent requests have been answered,
491/// which means that in case of blocking commands, the underlying connection resource might not be released,
492/// even when all clones of the multiplexed connection have been dropped (see <https://github.com/redis-rs/redis-rs/issues/1236>).
493/// This isn't an issue in a connection that was created in a canonical way, which ensures that `_task_handle` is set, so that
494/// once all of the connection's clones are dropped, the task will also be dropped. If the user creates the connection in
495/// another way and `_task_handle` isn't set, they should manually spawn the returned driver function, keep the spawned task's
496/// handle and abort the task whenever they want, at the risk of effectively closing the clones of the multiplexed connection.
497#[derive(Clone)]
498pub struct MultiplexedConnection {
499    pipeline: Pipeline,
500    db: i64,
501    response_timeout: Option<Duration>,
502    protocol: ProtocolVersion,
503    // This handle ensures that once all the clones of the connection will be dropped, the underlying task will stop.
504    // This handle is only set for connection whose task was spawned by the crate, not for users who spawned their own
505    // task.
506    _task_handle: Option<SharedHandleContainer>,
507    #[cfg(feature = "cache-aio")]
508    pub(crate) cache_manager: Option<CacheManager>,
509    #[cfg(feature = "token-based-authentication")]
510    // This handle ensures that once all the clones of the connection will be dropped, the underlying task will stop.
511    // It is only set for connections that use a credentials provider for token-based authentication.
512    _credentials_subscription_task_handle: Option<SharedHandleContainer>,
513    /// Flag indicating that re-authentication has failed and the connection is no longer usable.
514    /// When set, all subsequent commands will fail immediately with an authentication error.
515    #[cfg(feature = "token-based-authentication")]
516    re_authentication_failed: Arc<AtomicBool>,
517}
518
519impl Debug for MultiplexedConnection {
520    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
521        let MultiplexedConnection {
522            pipeline,
523            db,
524            response_timeout,
525            protocol,
526            _task_handle,
527            #[cfg(feature = "cache-aio")]
528                cache_manager: _,
529            #[cfg(feature = "token-based-authentication")]
530                _credentials_subscription_task_handle: _,
531            #[cfg(feature = "token-based-authentication")]
532                re_authentication_failed: _,
533        } = self;
534
535        f.debug_struct("MultiplexedConnection")
536            .field("pipeline", &pipeline)
537            .field("db", &db)
538            .field("response_timeout", &response_timeout)
539            .field("protocol", &protocol)
540            .finish()
541    }
542}
543
544impl MultiplexedConnection {
545    /// Constructs a new `MultiplexedConnection` out of a `AsyncRead + AsyncWrite` object
546    /// and a `RedisConnectionInfo`
547    pub async fn new<C>(
548        connection_info: &RedisConnectionInfo,
549        stream: C,
550    ) -> RedisResult<(Self, impl Future<Output = ()>)>
551    where
552        C: Unpin + AsyncRead + AsyncWrite + Send + 'static,
553    {
554        Self::new_with_config(connection_info, stream, AsyncConnectionConfig::default()).await
555    }
556
557    /// Constructs a new `MultiplexedConnection` out of a `AsyncRead + AsyncWrite` object
558    /// , a `RedisConnectionInfo` and a `AsyncConnectionConfig`.
559    pub async fn new_with_config<C>(
560        connection_info: &RedisConnectionInfo,
561        stream: C,
562        config: AsyncConnectionConfig,
563    ) -> RedisResult<(Self, impl Future<Output = ()> + 'static)>
564    where
565        C: Unpin + AsyncRead + AsyncWrite + Send + 'static,
566    {
567        let mut codec = ValueCodec::default().framed(stream);
568        if config.push_sender.is_some() {
569            check_resp3!(
570                connection_info.protocol,
571                "Can only pass push sender to a connection using RESP3"
572            );
573        }
574
575        #[cfg(feature = "cache-aio")]
576        let cache_config = config.cache.as_ref().map(|cache| match cache {
577            crate::client::Cache::Config(cache_config) => *cache_config,
578            #[cfg(any(feature = "connection-manager", feature = "cluster-async"))]
579            crate::client::Cache::Manager(cache_manager) => cache_manager.cache_config,
580        });
581        #[cfg(feature = "cache-aio")]
582        let cache_manager_opt = config
583            .cache
584            .map(|cache| {
585                check_resp3!(
586                    connection_info.protocol,
587                    "Can only enable client side caching in a connection using RESP3"
588                );
589                match cache {
590                    crate::client::Cache::Config(cache_config) => {
591                        Ok(CacheManager::new(cache_config))
592                    }
593                    #[cfg(any(feature = "connection-manager", feature = "cluster-async"))]
594                    crate::client::Cache::Manager(cache_manager) => Ok(cache_manager),
595                }
596            })
597            .transpose()?;
598
599        #[cfg(feature = "token-based-authentication")]
600        let mut connection_info = connection_info.clone();
601        #[cfg(not(feature = "token-based-authentication"))]
602        let connection_info = connection_info.clone();
603
604        #[cfg(feature = "token-based-authentication")]
605        if let Some(ref credentials_provider) = config.credentials_provider {
606            // Retrieve the initial credentials from the provider and apply them to the connection info
607            match credentials_provider.subscribe().next().await {
608                Some(Ok(credentials)) => {
609                    connection_info.username = Some(ArcStr::from(credentials.username));
610                    connection_info.password = Some(ArcStr::from(credentials.password));
611                }
612                Some(Err(err)) => {
613                    error!("Error while receiving credentials from stream: {err}");
614                    return Err(err);
615                }
616                None => {
617                    let err = RedisError::from((
618                        ErrorKind::AuthenticationFailed,
619                        "Credentials stream closed unexpectedly before yielding credentials!",
620                    ));
621                    error!("{err}");
622                    return Err(err);
623                }
624            }
625        }
626
627        setup_connection(
628            &mut codec,
629            &connection_info,
630            #[cfg(feature = "cache-aio")]
631            cache_config,
632        )
633        .await?;
634        if config.push_sender.is_some() {
635            check_resp3!(
636                connection_info.protocol,
637                "Can only pass push sender to a connection using RESP3"
638            );
639        }
640
641        let (pipeline, driver) = Pipeline::new(
642            codec,
643            config.push_sender,
644            #[cfg(feature = "cache-aio")]
645            cache_manager_opt.clone(),
646            Pipeline::resolve_buffer_size(config.pipeline_buffer_size),
647        );
648
649        let con = MultiplexedConnection {
650            pipeline,
651            db: connection_info.db,
652            response_timeout: config.response_timeout,
653            protocol: connection_info.protocol,
654            _task_handle: None,
655            #[cfg(feature = "cache-aio")]
656            cache_manager: cache_manager_opt,
657            #[cfg(feature = "token-based-authentication")]
658            _credentials_subscription_task_handle: None,
659            #[cfg(feature = "token-based-authentication")]
660            re_authentication_failed: Arc::new(AtomicBool::new(false)),
661        };
662
663        // Set up streaming credentials subscription if provider is available
664        #[cfg(feature = "token-based-authentication")]
665        if let Some(streaming_provider) = config.credentials_provider {
666            let mut inner_connection = con.clone();
667            let re_authentication_failed_arc = Arc::clone(&con.re_authentication_failed);
668            let mut stream = streaming_provider.subscribe();
669
670            let subscription_task_handle = Runtime::locate().spawn(async move {
671                let mut error_cause_logged = false;
672                while let Some(result) = stream.next().await {
673                    match result {
674                        Ok(credentials) => {
675                            if let Err(err) = inner_connection
676                                .re_authenticate_with_credentials(&credentials)
677                                .await
678                            {
679                                error!("Failed to re-authenticate async connection: {err}.");
680                                error_cause_logged = true;
681                                re_authentication_failed_arc.store(true, Ordering::Relaxed);
682                                break;
683                            } else {
684                                debug!("Re-authenticated async connection");
685                            }
686                        }
687                        Err(err) => {
688                            error!("Credentials stream error for async connection: {err}.");
689                            error_cause_logged = true;
690                        }
691                    }
692                }
693                if !re_authentication_failed_arc.load(Ordering::Relaxed) {
694                    if !error_cause_logged {
695                        error!("Re-authentication stream ended unexpectedly.");
696                    }
697                    re_authentication_failed_arc.store(true, Ordering::Relaxed);
698                }
699            });
700            return Ok((
701                Self {
702                    _credentials_subscription_task_handle: Some(SharedHandleContainer::new(
703                        subscription_task_handle,
704                    )),
705                    ..con
706                },
707                driver,
708            ));
709        }
710
711        Ok((con, driver))
712    }
713
714    /// This should be called strictly before the multiplexed connection is cloned - that is, before it is returned to the user.
715    /// Otherwise some clones will be able to kill the backing task, while other clones are still alive.
716    pub(crate) fn set_task_handle(&mut self, handle: TaskHandle) {
717        self._task_handle = Some(SharedHandleContainer::new(handle));
718    }
719
720    /// Sets the time that the multiplexer will wait for responses on operations before failing.
721    pub fn set_response_timeout(&mut self, timeout: std::time::Duration) {
722        self.response_timeout = Some(timeout);
723    }
724
725    /// Sends an already encoded (packed) command into the TCP socket and
726    /// reads the single response from it.
727    pub async fn send_packed_command(&mut self, cmd: &Cmd) -> RedisResult<Value> {
728        #[cfg(feature = "token-based-authentication")]
729        if self.re_authentication_failed.load(Ordering::Relaxed) {
730            return Err(RedisError::from((
731                ErrorKind::AuthenticationFailed,
732                "Connection is no longer usable due to re-authentication failure",
733            )));
734        }
735        #[cfg(feature = "cache-aio")]
736        if let Some(cache_manager) = &self.cache_manager {
737            match cache_manager.get_cached_cmd(cmd) {
738                PrepareCacheResult::Cached(value) => return Ok(value),
739                PrepareCacheResult::NotCached(cacheable_command) => {
740                    let mut pipeline = crate::Pipeline::new();
741                    cacheable_command.pack_command(cache_manager, &mut pipeline);
742
743                    let result = self
744                        .pipeline
745                        .send_recv(
746                            pipeline.get_packed_pipeline(),
747                            Some(PipelineResponseExpectation {
748                                skipped_response_count: 0,
749                                expected_response_count: pipeline.commands.len(),
750                                is_transaction: false,
751                                seen_responses: 0,
752                            }),
753                            self.response_timeout,
754                            cmd.is_no_response(),
755                        )
756                        .await?;
757                    let replies: Vec<Value> = crate::types::from_redis_value(result)?;
758                    return cacheable_command.resolve(cache_manager, replies.into_iter());
759                }
760                _ => (),
761            }
762        }
763        self.pipeline
764            .send_recv(
765                cmd.get_packed_command(),
766                None,
767                self.response_timeout,
768                cmd.is_no_response(),
769            )
770            .await
771    }
772
773    /// Sends multiple already encoded (packed) command into the TCP socket
774    /// and reads `count` responses from it.  This is used to implement
775    /// pipelining.
776    pub async fn send_packed_commands(
777        &mut self,
778        cmd: &crate::Pipeline,
779        offset: usize,
780        count: usize,
781    ) -> RedisResult<Vec<Value>> {
782        #[cfg(feature = "token-based-authentication")]
783        if self.re_authentication_failed.load(Ordering::Relaxed) {
784            return Err(RedisError::from((
785                ErrorKind::AuthenticationFailed,
786                "Connection is no longer usable due to re-authentication failure",
787            )));
788        }
789        #[cfg(feature = "cache-aio")]
790        if let Some(cache_manager) = &self.cache_manager {
791            let (cacheable_pipeline, pipeline, (skipped_response_count, expected_response_count)) =
792                cache_manager.get_cached_pipeline(cmd);
793            if pipeline.is_empty() {
794                return cacheable_pipeline.resolve(cache_manager, Value::Array(Vec::new()));
795            }
796            let result = self
797                .pipeline
798                .send_recv(
799                    pipeline.get_packed_pipeline(),
800                    Some(PipelineResponseExpectation {
801                        skipped_response_count,
802                        expected_response_count,
803                        is_transaction: cacheable_pipeline.transaction_mode,
804                        seen_responses: 0,
805                    }),
806                    self.response_timeout,
807                    false,
808                )
809                .await?;
810
811            return cacheable_pipeline.resolve(cache_manager, result);
812        }
813        let value = self
814            .pipeline
815            .send_recv(
816                cmd.get_packed_pipeline(),
817                Some(PipelineResponseExpectation {
818                    skipped_response_count: offset,
819                    expected_response_count: count,
820                    is_transaction: cmd.is_transaction(),
821                    seen_responses: 0,
822                }),
823                self.response_timeout,
824                false,
825            )
826            .await?;
827        match value {
828            Value::Array(values) => Ok(values),
829            _ => Ok(vec![value]),
830        }
831    }
832
833    /// Gets [`CacheStatistics`] for current connection if caching is enabled.
834    #[cfg(feature = "cache-aio")]
835    #[cfg_attr(docsrs, doc(cfg(feature = "cache-aio")))]
836    pub fn get_cache_statistics(&self) -> Option<CacheStatistics> {
837        self.cache_manager.as_ref().map(|cm| cm.statistics())
838    }
839}
840
841impl ConnectionLike for MultiplexedConnection {
842    fn req_packed_command<'a>(&'a mut self, cmd: &'a Cmd) -> RedisFuture<'a, Value> {
843        (async move { self.send_packed_command(cmd).await }).boxed()
844    }
845
846    fn req_packed_commands<'a>(
847        &'a mut self,
848        cmd: &'a crate::Pipeline,
849        offset: usize,
850        count: usize,
851    ) -> RedisFuture<'a, Vec<Value>> {
852        (async move { self.send_packed_commands(cmd, offset, count).await }).boxed()
853    }
854
855    fn get_db(&self) -> i64 {
856        self.db
857    }
858}
859
860impl MultiplexedConnection {
861    /// Subscribes to a new channel(s).    
862    ///
863    /// Updates from the sender will be sent on the push sender that was passed to the connection.
864    /// If the connection was configured without a push sender, the connection won't be able to pass messages back to the user.
865    ///
866    /// This method is only available when the connection is using RESP3 protocol, and will return an error otherwise.
867    ///
868    /// ```rust,no_run
869    /// # async fn func() -> redis::RedisResult<()> {
870    /// let client = redis::Client::open("redis://127.0.0.1/?protocol=resp3").unwrap();
871    /// let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
872    /// let config = redis::AsyncConnectionConfig::new().set_push_sender(tx);
873    /// let mut con = client.get_multiplexed_async_connection_with_config(&config).await?;
874    /// con.subscribe(&["channel_1", "channel_2"]).await?;
875    /// # Ok(()) }
876    /// ```
877    pub async fn subscribe(&mut self, channel_name: impl ToRedisArgs) -> RedisResult<()> {
878        check_resp3!(self.protocol);
879        let mut cmd = cmd("SUBSCRIBE");
880        cmd.arg(channel_name);
881        cmd.exec_async(self).await?;
882        Ok(())
883    }
884
885    /// Unsubscribes from channel(s).
886    ///
887    /// This method is only available when the connection is using RESP3 protocol, and will return an error otherwise.
888    ///
889    /// ```rust,no_run
890    /// # async fn func() -> redis::RedisResult<()> {
891    /// let client = redis::Client::open("redis://127.0.0.1/?protocol=resp3").unwrap();
892    /// let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
893    /// let config = redis::AsyncConnectionConfig::new().set_push_sender(tx);
894    /// let mut con = client.get_multiplexed_async_connection_with_config(&config).await?;
895    /// con.subscribe(&["channel_1", "channel_2"]).await?;
896    /// con.unsubscribe(&["channel_1", "channel_2"]).await?;
897    /// # Ok(()) }
898    /// ```
899    pub async fn unsubscribe(&mut self, channel_name: impl ToRedisArgs) -> RedisResult<()> {
900        check_resp3!(self.protocol);
901        let mut cmd = cmd("UNSUBSCRIBE");
902        cmd.arg(channel_name);
903        cmd.exec_async(self).await?;
904        Ok(())
905    }
906
907    /// Subscribes to new channel(s) with pattern(s).
908    ///
909    /// Updates from the sender will be sent on the push sender that was passed to the connection.
910    /// If the connection was configured without a push sender, the connection won't be able to pass messages back to the user.
911    ///
912    /// This method is only available when the connection is using RESP3 protocol, and will return an error otherwise.
913    ///
914    /// ```rust,no_run
915    /// # async fn func() -> redis::RedisResult<()> {
916    /// let client = redis::Client::open("redis://127.0.0.1/?protocol=resp3").unwrap();
917    /// let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
918    /// let config = redis::AsyncConnectionConfig::new().set_push_sender(tx);
919    /// let mut con = client.get_multiplexed_async_connection_with_config(&config).await?;
920    /// con.psubscribe("channel*_1").await?;
921    /// con.psubscribe(&["channel*_2", "channel*_3"]).await?;
922    /// # Ok(())
923    /// # }
924    /// ```
925    pub async fn psubscribe(&mut self, channel_pattern: impl ToRedisArgs) -> RedisResult<()> {
926        check_resp3!(self.protocol);
927        let mut cmd = cmd("PSUBSCRIBE");
928        cmd.arg(channel_pattern);
929        cmd.exec_async(self).await?;
930        Ok(())
931    }
932
933    /// Unsubscribes from channel pattern(s).
934    ///
935    /// This method is only available when the connection is using RESP3 protocol, and will return an error otherwise.
936    pub async fn punsubscribe(&mut self, channel_pattern: impl ToRedisArgs) -> RedisResult<()> {
937        check_resp3!(self.protocol);
938        let mut cmd = cmd("PUNSUBSCRIBE");
939        cmd.arg(channel_pattern);
940        cmd.exec_async(self).await?;
941        Ok(())
942    }
943}
944
945#[cfg(feature = "token-based-authentication")]
946impl MultiplexedConnection {
947    /// Re-authenticate the connection with new credentials
948    ///
949    /// This method allows existing async connections to update their authentication
950    /// when tokens are refreshed, enabling streaming credential updates.
951    async fn re_authenticate_with_credentials(
952        &mut self,
953        credentials: &crate::auth::BasicAuth,
954    ) -> RedisResult<()> {
955        let auth_cmd =
956            crate::connection::authenticate_cmd(Some(&credentials.username), &credentials.password);
957        // Send the AUTH command and convert any Redis error response to an Err
958        self.send_packed_command(&auth_cmd)
959            .await?
960            .extract_error()
961            .map(|_| ())
962    }
963}
964
965#[cfg(test)]
966mod tests {
967    use super::*;
968
969    #[test]
970    fn test_pipeline_resolve_buffer_size_default() {
971        assert_eq!(Pipeline::resolve_buffer_size(None), 50);
972    }
973
974    #[test]
975    fn test_pipeline_resolve_buffer_size_custom() {
976        assert_eq!(Pipeline::resolve_buffer_size(Some(100)), 100);
977    }
978}