1use super::{AsyncPushSender, ConnectionLike, Runtime, SharedHandleContainer, TaskHandle};
2#[cfg(feature = "cache-aio")]
3use crate::caching::{CacheManager, CacheStatistics, PrepareCacheResult};
4use crate::{
5 AsyncConnectionConfig, ProtocolVersion, PushInfo, RedisConnectionInfo, ServerError,
6 ToRedisArgs,
7 aio::setup_connection,
8 check_resp3, cmd,
9 cmd::Cmd,
10 errors::{RedisError, closed_connection_error},
11 parser::ValueCodec,
12 types::{RedisFuture, RedisResult, Value},
13};
14use ::tokio::{
15 io::{AsyncRead, AsyncWrite},
16 sync::{mpsc, oneshot},
17};
18#[cfg(feature = "token-based-authentication")]
19use {
20 crate::errors::ErrorKind,
21 arcstr::ArcStr,
22 log::{debug, error},
23 std::sync::atomic::{AtomicBool, Ordering},
24};
25
26use futures_util::{
27 future::{Future, FutureExt},
28 ready,
29 sink::Sink,
30 stream::{self, Stream, StreamExt},
31};
32use pin_project_lite::pin_project;
33use std::collections::VecDeque;
34use std::fmt;
35use std::fmt::Debug;
36use std::pin::Pin;
37use std::sync::Arc;
38use std::task::{self, Poll};
39use std::time::Duration;
40use tokio_util::codec::Decoder;
41
42type PipelineOutput = oneshot::Sender<RedisResult<Value>>;
44
45enum ErrorOrErrors {
46 Errors(Vec<(usize, ServerError)>),
47 FirstError(RedisError),
49}
50
51enum ResponseAggregate {
52 SingleCommand,
53 Pipeline {
54 buffer: Vec<Value>,
55 error_or_errors: ErrorOrErrors,
56 expectation: PipelineResponseExpectation,
57 },
58}
59
60struct PipelineResponseExpectation {
62 skipped_response_count: usize,
64 expected_response_count: usize,
66 is_transaction: bool,
68 seen_responses: usize,
69}
70
71impl ResponseAggregate {
72 fn new(expectation: Option<PipelineResponseExpectation>) -> Self {
73 match expectation {
74 Some(expectation) => ResponseAggregate::Pipeline {
75 buffer: Vec::new(),
76 error_or_errors: ErrorOrErrors::Errors(Vec::new()),
77 expectation,
78 },
79 None => ResponseAggregate::SingleCommand,
80 }
81 }
82}
83
84struct InFlight {
85 output: Option<PipelineOutput>,
86 response_aggregate: ResponseAggregate,
87}
88
89struct PipelineMessage {
91 input: Vec<u8>,
92 output: Option<PipelineOutput>,
94 expectation: Option<PipelineResponseExpectation>,
98}
99
100#[derive(Clone)]
105struct Pipeline {
106 sender: mpsc::Sender<PipelineMessage>,
107}
108
109impl Debug for Pipeline {
110 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
111 f.debug_tuple("Pipeline").field(&self.sender).finish()
112 }
113}
114
115#[cfg(feature = "cache-aio")]
116pin_project! {
117 struct PipelineSink<T> {
118 #[pin]
119 sink_stream: T,
120 in_flight: VecDeque<InFlight>,
121 error: Option<RedisError>,
122 push_sender: Option<Arc<dyn AsyncPushSender>>,
123 cache_manager: Option<CacheManager>,
124 }
125}
126
127#[cfg(not(feature = "cache-aio"))]
128pin_project! {
129 struct PipelineSink<T> {
130 #[pin]
131 sink_stream: T,
132 in_flight: VecDeque<InFlight>,
133 error: Option<RedisError>,
134 push_sender: Option<Arc<dyn AsyncPushSender>>,
135 }
136}
137
138fn send_push(push_sender: &Option<Arc<dyn AsyncPushSender>>, info: PushInfo) {
139 if let Some(sender) = push_sender {
140 let _ = sender.send(info);
141 };
142}
143
144pub(crate) fn send_disconnect(push_sender: &Option<Arc<dyn AsyncPushSender>>) {
145 send_push(push_sender, PushInfo::disconnect());
146}
147
148impl<T> PipelineSink<T>
149where
150 T: Stream<Item = RedisResult<Value>> + 'static,
151{
152 fn new(
153 sink_stream: T,
154 push_sender: Option<Arc<dyn AsyncPushSender>>,
155 #[cfg(feature = "cache-aio")] cache_manager: Option<CacheManager>,
156 ) -> Self
157 where
158 T: Sink<Vec<u8>, Error = RedisError> + Stream<Item = RedisResult<Value>> + 'static,
159 {
160 PipelineSink {
161 sink_stream,
162 in_flight: VecDeque::new(),
163 error: None,
164 push_sender,
165 #[cfg(feature = "cache-aio")]
166 cache_manager,
167 }
168 }
169
170 fn poll_read(mut self: Pin<&mut Self>, cx: &mut task::Context) -> Poll<Result<(), ()>> {
172 loop {
173 let item = ready!(self.as_mut().project().sink_stream.poll_next(cx));
174 let item = match item {
175 Some(result) => result,
176 None => Err(closed_connection_error()),
178 };
179
180 let is_unrecoverable = item.as_ref().is_err_and(|err| err.is_unrecoverable_error());
181 self.as_mut().send_result(item);
182 if is_unrecoverable {
183 let self_ = self.project();
184 send_disconnect(self_.push_sender);
185 return Poll::Ready(Err(()));
186 }
187 }
188 }
189
190 fn send_result(self: Pin<&mut Self>, result: RedisResult<Value>) {
191 let self_ = self.project();
192 let result = match result {
193 Ok(Value::Push { kind, data }) if !kind.has_reply() => {
195 #[cfg(feature = "cache-aio")]
196 if let Some(cache_manager) = &self_.cache_manager {
197 cache_manager.handle_push_value(&kind, &data);
198 }
199 send_push(self_.push_sender, PushInfo { kind, data });
200
201 return;
202 }
203 Ok(Value::Push { kind, data }) if kind.has_reply() => {
205 send_push(
206 self_.push_sender,
207 PushInfo {
208 kind: kind.clone(),
209 data: data.clone(),
210 },
211 );
212 Ok(Value::Push { kind, data })
213 }
214 _ => result,
215 };
216
217 let mut entry = match self_.in_flight.pop_front() {
218 Some(entry) => entry,
219 None => return,
220 };
221
222 match &mut entry.response_aggregate {
223 ResponseAggregate::SingleCommand => {
224 if let Some(output) = entry.output.take() {
225 _ = output.send(result);
226 }
227 }
228 ResponseAggregate::Pipeline {
229 buffer,
230 error_or_errors,
231 expectation:
232 PipelineResponseExpectation {
233 expected_response_count,
234 skipped_response_count,
235 is_transaction,
236 seen_responses,
237 },
238 } => {
239 *seen_responses += 1;
240 if *skipped_response_count > 0 {
241 if *is_transaction {
244 if let ErrorOrErrors::Errors(errs) = error_or_errors {
245 match result {
246 Ok(Value::ServerError(err)) => {
247 errs.push((*seen_responses - 2, err)); }
249 Err(err) => *error_or_errors = ErrorOrErrors::FirstError(err),
250 _ => {}
251 }
252 }
253 }
254
255 *skipped_response_count -= 1;
256 self_.in_flight.push_front(entry);
257 return;
258 }
259
260 match result {
261 Ok(item) => {
262 buffer.push(item);
263 }
264 Err(err) => {
265 if matches!(error_or_errors, ErrorOrErrors::Errors(_)) {
266 *error_or_errors = ErrorOrErrors::FirstError(err)
267 }
268 }
269 }
270
271 if buffer.len() < *expected_response_count {
272 self_.in_flight.push_front(entry);
274 return;
275 }
276
277 let response =
278 match std::mem::replace(error_or_errors, ErrorOrErrors::Errors(Vec::new())) {
279 ErrorOrErrors::Errors(errors) => {
280 if errors.is_empty() {
281 Ok(Value::Array(std::mem::take(buffer)))
282 } else {
283 Err(RedisError::make_aborted_transaction(errors))
284 }
285 }
286 ErrorOrErrors::FirstError(redis_error) => Err(redis_error),
287 };
288
289 if let Some(output) = entry.output.take() {
293 _ = output.send(response);
294 }
295 }
296 }
297 }
298}
299
300impl<T> Sink<PipelineMessage> for PipelineSink<T>
301where
302 T: Sink<Vec<u8>, Error = RedisError> + Stream<Item = RedisResult<Value>> + 'static,
303{
304 type Error = ();
305
306 fn poll_ready(
308 mut self: Pin<&mut Self>,
309 cx: &mut task::Context,
310 ) -> Poll<Result<(), Self::Error>> {
311 match ready!(self.as_mut().project().sink_stream.poll_ready(cx)) {
312 Ok(()) => Ok(()).into(),
313 Err(err) => {
314 *self.project().error = Some(err);
315 Ok(()).into()
316 }
317 }
318 }
319
320 fn start_send(
321 mut self: Pin<&mut Self>,
322 PipelineMessage {
323 input,
324 mut output,
325 expectation,
326 }: PipelineMessage,
327 ) -> Result<(), Self::Error> {
328 if output.as_ref().is_some_and(|output| output.is_closed()) {
332 return Ok(());
333 }
334
335 let self_ = self.as_mut().project();
336
337 if let Some(err) = self_.error.take() {
338 if let Some(output) = output.take() {
339 _ = output.send(Err(err));
340 }
341 return Err(());
342 }
343
344 match self_.sink_stream.start_send(input) {
345 Ok(()) => {
346 let response_aggregate = ResponseAggregate::new(expectation);
347 let entry = InFlight {
348 output,
349 response_aggregate,
350 };
351
352 self_.in_flight.push_back(entry);
353 Ok(())
354 }
355 Err(err) => {
356 if let Some(output) = output.take() {
357 _ = output.send(Err(err));
358 }
359 Err(())
360 }
361 }
362 }
363
364 fn poll_flush(
365 mut self: Pin<&mut Self>,
366 cx: &mut task::Context,
367 ) -> Poll<Result<(), Self::Error>> {
368 ready!(
369 self.as_mut()
370 .project()
371 .sink_stream
372 .poll_flush(cx)
373 .map_err(|err| {
374 self.as_mut().send_result(Err(err));
375 })
376 )?;
377 self.poll_read(cx)
378 }
379
380 fn poll_close(
381 mut self: Pin<&mut Self>,
382 cx: &mut task::Context,
383 ) -> Poll<Result<(), Self::Error>> {
384 if !self.in_flight.is_empty() {
387 ready!(self.as_mut().poll_flush(cx))?;
388 }
389 let this = self.as_mut().project();
390 this.sink_stream.poll_close(cx).map_err(|err| {
391 self.send_result(Err(err));
392 })
393 }
394}
395
396impl Pipeline {
397 const DEFAULT_BUFFER_SIZE: usize = 50;
398
399 fn resolve_buffer_size(size: Option<usize>) -> usize {
400 size.unwrap_or(Self::DEFAULT_BUFFER_SIZE)
401 }
402
403 fn new<T>(
404 sink_stream: T,
405 push_sender: Option<Arc<dyn AsyncPushSender>>,
406 #[cfg(feature = "cache-aio")] cache_manager: Option<CacheManager>,
407 buffer_size: usize,
408 ) -> (Self, impl Future<Output = ()>)
409 where
410 T: Sink<Vec<u8>, Error = RedisError>,
411 T: Stream<Item = RedisResult<Value>>,
412 T: Unpin + Send + 'static,
413 {
414 let (sender, mut receiver) = mpsc::channel(buffer_size);
415
416 let sink = PipelineSink::new(
417 sink_stream,
418 push_sender,
419 #[cfg(feature = "cache-aio")]
420 cache_manager,
421 );
422 let f = stream::poll_fn(move |cx| receiver.poll_recv(cx))
423 .map(Ok)
424 .forward(sink)
425 .map(|_| ());
426 (Pipeline { sender }, f)
427 }
428
429 async fn send_recv(
430 &mut self,
431 input: Vec<u8>,
432 expectation: Option<PipelineResponseExpectation>,
435 timeout: Option<Duration>,
436 skip_response: bool,
437 ) -> Result<Value, RedisError> {
438 if input.is_empty() {
439 return Err(RedisError::make_empty_command());
440 }
441
442 let request = async {
443 if skip_response {
444 self.sender
445 .send(PipelineMessage {
446 input,
447 expectation,
448 output: None,
449 })
450 .await
451 .map_err(|_| None)?;
452
453 return Ok(Value::Nil);
454 }
455
456 let (sender, receiver) = oneshot::channel();
457
458 self.sender
459 .send(PipelineMessage {
460 input,
461 expectation,
462 output: Some(sender),
463 })
464 .await
465 .map_err(|_| None)?;
466
467 receiver.await
468 .map_err(|_| None)
471 .and_then(|res| res.map_err(Some))
472 };
473
474 match timeout {
475 Some(timeout) => match Runtime::locate().timeout(timeout, request).await {
476 Ok(res) => res,
477 Err(elapsed) => Err(Some(elapsed.into())),
478 },
479 None => request.await,
480 }
481 .map_err(|err| err.unwrap_or_else(closed_connection_error))
482 }
483}
484
485#[derive(Clone)]
498pub struct MultiplexedConnection {
499 pipeline: Pipeline,
500 db: i64,
501 response_timeout: Option<Duration>,
502 protocol: ProtocolVersion,
503 _task_handle: Option<SharedHandleContainer>,
507 #[cfg(feature = "cache-aio")]
508 pub(crate) cache_manager: Option<CacheManager>,
509 #[cfg(feature = "token-based-authentication")]
510 _credentials_subscription_task_handle: Option<SharedHandleContainer>,
513 #[cfg(feature = "token-based-authentication")]
516 re_authentication_failed: Arc<AtomicBool>,
517}
518
519impl Debug for MultiplexedConnection {
520 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
521 let MultiplexedConnection {
522 pipeline,
523 db,
524 response_timeout,
525 protocol,
526 _task_handle,
527 #[cfg(feature = "cache-aio")]
528 cache_manager: _,
529 #[cfg(feature = "token-based-authentication")]
530 _credentials_subscription_task_handle: _,
531 #[cfg(feature = "token-based-authentication")]
532 re_authentication_failed: _,
533 } = self;
534
535 f.debug_struct("MultiplexedConnection")
536 .field("pipeline", &pipeline)
537 .field("db", &db)
538 .field("response_timeout", &response_timeout)
539 .field("protocol", &protocol)
540 .finish()
541 }
542}
543
544impl MultiplexedConnection {
545 pub async fn new<C>(
548 connection_info: &RedisConnectionInfo,
549 stream: C,
550 ) -> RedisResult<(Self, impl Future<Output = ()>)>
551 where
552 C: Unpin + AsyncRead + AsyncWrite + Send + 'static,
553 {
554 Self::new_with_config(connection_info, stream, AsyncConnectionConfig::default()).await
555 }
556
557 pub async fn new_with_config<C>(
560 connection_info: &RedisConnectionInfo,
561 stream: C,
562 config: AsyncConnectionConfig,
563 ) -> RedisResult<(Self, impl Future<Output = ()> + 'static)>
564 where
565 C: Unpin + AsyncRead + AsyncWrite + Send + 'static,
566 {
567 let mut codec = ValueCodec::default().framed(stream);
568 if config.push_sender.is_some() {
569 check_resp3!(
570 connection_info.protocol,
571 "Can only pass push sender to a connection using RESP3"
572 );
573 }
574
575 #[cfg(feature = "cache-aio")]
576 let cache_config = config.cache.as_ref().map(|cache| match cache {
577 crate::client::Cache::Config(cache_config) => *cache_config,
578 #[cfg(any(feature = "connection-manager", feature = "cluster-async"))]
579 crate::client::Cache::Manager(cache_manager) => cache_manager.cache_config,
580 });
581 #[cfg(feature = "cache-aio")]
582 let cache_manager_opt = config
583 .cache
584 .map(|cache| {
585 check_resp3!(
586 connection_info.protocol,
587 "Can only enable client side caching in a connection using RESP3"
588 );
589 match cache {
590 crate::client::Cache::Config(cache_config) => {
591 Ok(CacheManager::new(cache_config))
592 }
593 #[cfg(any(feature = "connection-manager", feature = "cluster-async"))]
594 crate::client::Cache::Manager(cache_manager) => Ok(cache_manager),
595 }
596 })
597 .transpose()?;
598
599 #[cfg(feature = "token-based-authentication")]
600 let mut connection_info = connection_info.clone();
601 #[cfg(not(feature = "token-based-authentication"))]
602 let connection_info = connection_info.clone();
603
604 #[cfg(feature = "token-based-authentication")]
605 if let Some(ref credentials_provider) = config.credentials_provider {
606 match credentials_provider.subscribe().next().await {
608 Some(Ok(credentials)) => {
609 connection_info.username = Some(ArcStr::from(credentials.username));
610 connection_info.password = Some(ArcStr::from(credentials.password));
611 }
612 Some(Err(err)) => {
613 error!("Error while receiving credentials from stream: {err}");
614 return Err(err);
615 }
616 None => {
617 let err = RedisError::from((
618 ErrorKind::AuthenticationFailed,
619 "Credentials stream closed unexpectedly before yielding credentials!",
620 ));
621 error!("{err}");
622 return Err(err);
623 }
624 }
625 }
626
627 setup_connection(
628 &mut codec,
629 &connection_info,
630 #[cfg(feature = "cache-aio")]
631 cache_config,
632 )
633 .await?;
634 if config.push_sender.is_some() {
635 check_resp3!(
636 connection_info.protocol,
637 "Can only pass push sender to a connection using RESP3"
638 );
639 }
640
641 let (pipeline, driver) = Pipeline::new(
642 codec,
643 config.push_sender,
644 #[cfg(feature = "cache-aio")]
645 cache_manager_opt.clone(),
646 Pipeline::resolve_buffer_size(config.pipeline_buffer_size),
647 );
648
649 let con = MultiplexedConnection {
650 pipeline,
651 db: connection_info.db,
652 response_timeout: config.response_timeout,
653 protocol: connection_info.protocol,
654 _task_handle: None,
655 #[cfg(feature = "cache-aio")]
656 cache_manager: cache_manager_opt,
657 #[cfg(feature = "token-based-authentication")]
658 _credentials_subscription_task_handle: None,
659 #[cfg(feature = "token-based-authentication")]
660 re_authentication_failed: Arc::new(AtomicBool::new(false)),
661 };
662
663 #[cfg(feature = "token-based-authentication")]
665 if let Some(streaming_provider) = config.credentials_provider {
666 let mut inner_connection = con.clone();
667 let re_authentication_failed_arc = Arc::clone(&con.re_authentication_failed);
668 let mut stream = streaming_provider.subscribe();
669
670 let subscription_task_handle = Runtime::locate().spawn(async move {
671 let mut error_cause_logged = false;
672 while let Some(result) = stream.next().await {
673 match result {
674 Ok(credentials) => {
675 if let Err(err) = inner_connection
676 .re_authenticate_with_credentials(&credentials)
677 .await
678 {
679 error!("Failed to re-authenticate async connection: {err}.");
680 error_cause_logged = true;
681 re_authentication_failed_arc.store(true, Ordering::Relaxed);
682 break;
683 } else {
684 debug!("Re-authenticated async connection");
685 }
686 }
687 Err(err) => {
688 error!("Credentials stream error for async connection: {err}.");
689 error_cause_logged = true;
690 }
691 }
692 }
693 if !re_authentication_failed_arc.load(Ordering::Relaxed) {
694 if !error_cause_logged {
695 error!("Re-authentication stream ended unexpectedly.");
696 }
697 re_authentication_failed_arc.store(true, Ordering::Relaxed);
698 }
699 });
700 return Ok((
701 Self {
702 _credentials_subscription_task_handle: Some(SharedHandleContainer::new(
703 subscription_task_handle,
704 )),
705 ..con
706 },
707 driver,
708 ));
709 }
710
711 Ok((con, driver))
712 }
713
714 pub(crate) fn set_task_handle(&mut self, handle: TaskHandle) {
717 self._task_handle = Some(SharedHandleContainer::new(handle));
718 }
719
720 pub fn set_response_timeout(&mut self, timeout: std::time::Duration) {
722 self.response_timeout = Some(timeout);
723 }
724
725 pub async fn send_packed_command(&mut self, cmd: &Cmd) -> RedisResult<Value> {
728 #[cfg(feature = "token-based-authentication")]
729 if self.re_authentication_failed.load(Ordering::Relaxed) {
730 return Err(RedisError::from((
731 ErrorKind::AuthenticationFailed,
732 "Connection is no longer usable due to re-authentication failure",
733 )));
734 }
735 #[cfg(feature = "cache-aio")]
736 if let Some(cache_manager) = &self.cache_manager {
737 match cache_manager.get_cached_cmd(cmd) {
738 PrepareCacheResult::Cached(value) => return Ok(value),
739 PrepareCacheResult::NotCached(cacheable_command) => {
740 let mut pipeline = crate::Pipeline::new();
741 cacheable_command.pack_command(cache_manager, &mut pipeline);
742
743 let result = self
744 .pipeline
745 .send_recv(
746 pipeline.get_packed_pipeline(),
747 Some(PipelineResponseExpectation {
748 skipped_response_count: 0,
749 expected_response_count: pipeline.commands.len(),
750 is_transaction: false,
751 seen_responses: 0,
752 }),
753 self.response_timeout,
754 cmd.is_no_response(),
755 )
756 .await?;
757 let replies: Vec<Value> = crate::types::from_redis_value(result)?;
758 return cacheable_command.resolve(cache_manager, replies.into_iter());
759 }
760 _ => (),
761 }
762 }
763 self.pipeline
764 .send_recv(
765 cmd.get_packed_command(),
766 None,
767 self.response_timeout,
768 cmd.is_no_response(),
769 )
770 .await
771 }
772
773 pub async fn send_packed_commands(
777 &mut self,
778 cmd: &crate::Pipeline,
779 offset: usize,
780 count: usize,
781 ) -> RedisResult<Vec<Value>> {
782 #[cfg(feature = "token-based-authentication")]
783 if self.re_authentication_failed.load(Ordering::Relaxed) {
784 return Err(RedisError::from((
785 ErrorKind::AuthenticationFailed,
786 "Connection is no longer usable due to re-authentication failure",
787 )));
788 }
789 #[cfg(feature = "cache-aio")]
790 if let Some(cache_manager) = &self.cache_manager {
791 let (cacheable_pipeline, pipeline, (skipped_response_count, expected_response_count)) =
792 cache_manager.get_cached_pipeline(cmd);
793 if pipeline.is_empty() {
794 return cacheable_pipeline.resolve(cache_manager, Value::Array(Vec::new()));
795 }
796 let result = self
797 .pipeline
798 .send_recv(
799 pipeline.get_packed_pipeline(),
800 Some(PipelineResponseExpectation {
801 skipped_response_count,
802 expected_response_count,
803 is_transaction: cacheable_pipeline.transaction_mode,
804 seen_responses: 0,
805 }),
806 self.response_timeout,
807 false,
808 )
809 .await?;
810
811 return cacheable_pipeline.resolve(cache_manager, result);
812 }
813 let value = self
814 .pipeline
815 .send_recv(
816 cmd.get_packed_pipeline(),
817 Some(PipelineResponseExpectation {
818 skipped_response_count: offset,
819 expected_response_count: count,
820 is_transaction: cmd.is_transaction(),
821 seen_responses: 0,
822 }),
823 self.response_timeout,
824 false,
825 )
826 .await?;
827 match value {
828 Value::Array(values) => Ok(values),
829 _ => Ok(vec![value]),
830 }
831 }
832
833 #[cfg(feature = "cache-aio")]
835 #[cfg_attr(docsrs, doc(cfg(feature = "cache-aio")))]
836 pub fn get_cache_statistics(&self) -> Option<CacheStatistics> {
837 self.cache_manager.as_ref().map(|cm| cm.statistics())
838 }
839}
840
841impl ConnectionLike for MultiplexedConnection {
842 fn req_packed_command<'a>(&'a mut self, cmd: &'a Cmd) -> RedisFuture<'a, Value> {
843 (async move { self.send_packed_command(cmd).await }).boxed()
844 }
845
846 fn req_packed_commands<'a>(
847 &'a mut self,
848 cmd: &'a crate::Pipeline,
849 offset: usize,
850 count: usize,
851 ) -> RedisFuture<'a, Vec<Value>> {
852 (async move { self.send_packed_commands(cmd, offset, count).await }).boxed()
853 }
854
855 fn get_db(&self) -> i64 {
856 self.db
857 }
858}
859
860impl MultiplexedConnection {
861 pub async fn subscribe(&mut self, channel_name: impl ToRedisArgs) -> RedisResult<()> {
878 check_resp3!(self.protocol);
879 let mut cmd = cmd("SUBSCRIBE");
880 cmd.arg(channel_name);
881 cmd.exec_async(self).await?;
882 Ok(())
883 }
884
885 pub async fn unsubscribe(&mut self, channel_name: impl ToRedisArgs) -> RedisResult<()> {
900 check_resp3!(self.protocol);
901 let mut cmd = cmd("UNSUBSCRIBE");
902 cmd.arg(channel_name);
903 cmd.exec_async(self).await?;
904 Ok(())
905 }
906
907 pub async fn psubscribe(&mut self, channel_pattern: impl ToRedisArgs) -> RedisResult<()> {
926 check_resp3!(self.protocol);
927 let mut cmd = cmd("PSUBSCRIBE");
928 cmd.arg(channel_pattern);
929 cmd.exec_async(self).await?;
930 Ok(())
931 }
932
933 pub async fn punsubscribe(&mut self, channel_pattern: impl ToRedisArgs) -> RedisResult<()> {
937 check_resp3!(self.protocol);
938 let mut cmd = cmd("PUNSUBSCRIBE");
939 cmd.arg(channel_pattern);
940 cmd.exec_async(self).await?;
941 Ok(())
942 }
943}
944
945#[cfg(feature = "token-based-authentication")]
946impl MultiplexedConnection {
947 async fn re_authenticate_with_credentials(
952 &mut self,
953 credentials: &crate::auth::BasicAuth,
954 ) -> RedisResult<()> {
955 let auth_cmd =
956 crate::connection::authenticate_cmd(Some(&credentials.username), &credentials.password);
957 self.send_packed_command(&auth_cmd)
959 .await?
960 .extract_error()
961 .map(|_| ())
962 }
963}
964
965#[cfg(test)]
966mod tests {
967 use super::*;
968
969 #[test]
970 fn test_pipeline_resolve_buffer_size_default() {
971 assert_eq!(Pipeline::resolve_buffer_size(None), 50);
972 }
973
974 #[test]
975 fn test_pipeline_resolve_buffer_size_custom() {
976 assert_eq!(Pipeline::resolve_buffer_size(Some(100)), 100);
977 }
978}