Skip to main content

redis/
cmd.rs

1#[cfg(feature = "aio")]
2use futures_util::{
3    Stream, StreamExt,
4    future::BoxFuture,
5    task::{Context, Poll},
6};
7#[cfg(feature = "aio")]
8use std::pin::Pin;
9#[cfg(feature = "cache-aio")]
10use std::time::Duration;
11use std::{fmt, io, io::Write};
12
13use crate::pipeline::Pipeline;
14use crate::types::{FromRedisValue, RedisResult, RedisWrite, ToRedisArgs, from_redis_value};
15use crate::{ParsingError, connection::ConnectionLike};
16
17/// An argument to a redis command
18#[derive(Clone, PartialEq, Debug)]
19#[non_exhaustive]
20pub enum Arg<D> {
21    /// A normal argument
22    Simple(D),
23    /// A cursor argument created from `cursor_arg()`
24    Cursor,
25}
26
27/// CommandCacheConfig is used to define caching behaviour of individual commands.
28/// # Example
29/// ```rust
30/// use std::time::Duration;
31/// use redis::{CommandCacheConfig, Cmd};
32///
33/// let ttl = Duration::from_secs(120); // 2 minutes TTL
34/// let config = CommandCacheConfig::new()
35///     .set_enable_cache(true)
36///     .set_client_side_ttl(ttl);
37/// let command = Cmd::new().arg("GET").arg("key").set_cache_config(config);
38/// ```
39#[cfg(feature = "cache-aio")]
40#[cfg_attr(docsrs, doc(cfg(feature = "cache-aio")))]
41#[derive(Clone, Debug)]
42pub struct CommandCacheConfig {
43    pub(crate) enable_cache: bool,
44    pub(crate) client_side_ttl: Option<Duration>,
45}
46
47#[cfg(feature = "cache-aio")]
48impl CommandCacheConfig {
49    /// Creates new CommandCacheConfig with enable_cache as true and without client_side_ttl.
50    pub fn new() -> Self {
51        Self {
52            enable_cache: true,
53            client_side_ttl: None,
54        }
55    }
56
57    /// Sets whether the cache should be enabled or not.
58    /// Disabling cache for specific command when using [crate::caching::CacheMode::All] will not work.
59    pub fn set_enable_cache(mut self, enable_cache: bool) -> Self {
60        self.enable_cache = enable_cache;
61        self
62    }
63
64    /// Sets custom client side time to live (TTL).
65    pub fn set_client_side_ttl(mut self, client_side_ttl: Duration) -> Self {
66        self.client_side_ttl = Some(client_side_ttl);
67        self
68    }
69}
70#[cfg(feature = "cache-aio")]
71impl Default for CommandCacheConfig {
72    fn default() -> Self {
73        Self::new()
74    }
75}
76
77/// Represents redis commands.
78#[derive(Clone)]
79pub struct Cmd {
80    pub(crate) data: Vec<u8>,
81    // Arg::Simple contains the offset that marks the end of the argument
82    args: Vec<Arg<usize>>,
83    cursor: Option<u64>,
84    // If it's true command's response won't be read from socket. Useful for Pub/Sub.
85    no_response: bool,
86    pub(crate) skip_concurrency_limit: bool,
87    #[cfg(feature = "cache-aio")]
88    cache: Option<CommandCacheConfig>,
89}
90
91impl std::fmt::Debug for Cmd {
92    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
93        let mut debug_struct = f.debug_struct("Cmd");
94        debug_struct
95            .field("data", &String::from_utf8_lossy(&self.data).as_ref())
96            .field("args", &self.args)
97            .field("cursor", &self.cursor)
98            .field("no_response", &self.no_response);
99
100        #[cfg(feature = "cache-aio")]
101        debug_struct.field("cache", &self.cache);
102
103        debug_struct.finish()
104    }
105}
106
107/// Represents a redis iterator.
108pub struct Iter<'a, T: FromRedisValue> {
109    iter: CheckedIter<'a, T>,
110}
111impl<T: FromRedisValue> Iterator for Iter<'_, T> {
112    type Item = RedisResult<T>;
113
114    #[inline]
115    fn next(&mut self) -> Option<RedisResult<T>> {
116        self.iter.next()
117    }
118}
119
120/// Represents a safe(r) redis iterator.
121struct CheckedIter<'a, T: FromRedisValue> {
122    batch: std::vec::IntoIter<Result<T, ParsingError>>,
123    con: &'a mut (dyn ConnectionLike + 'a),
124    cmd: Cmd,
125}
126
127impl<T: FromRedisValue> Iterator for CheckedIter<'_, T> {
128    type Item = RedisResult<T>;
129
130    #[inline]
131    fn next(&mut self) -> Option<RedisResult<T>> {
132        // we need to do this in a loop until we produce at least one item
133        // or we find the actual end of the iteration.  This is necessary
134        // because with filtering an iterator it is possible that a whole
135        // chunk is not matching the pattern and thus yielding empty results.
136        loop {
137            if let Some(value) = self.batch.next() {
138                return Some(value.map_err(|err| err.into()));
139            };
140
141            if self.cmd.cursor? == 0 {
142                return None;
143            }
144
145            let (cursor, batch) = match self
146                .con
147                .req_packed_command(&self.cmd.get_packed_command())
148                .and_then(|val| Ok(from_redis_value::<(u64, _)>(val)?))
149            {
150                Ok((cursor, values)) => (cursor, T::from_each_redis_values(values)),
151                Err(e) => return Some(Err(e)),
152            };
153
154            self.cmd.cursor = Some(cursor);
155            self.batch = batch.into_iter();
156        }
157    }
158}
159
160#[cfg(feature = "aio")]
161use crate::aio::ConnectionLike as AsyncConnection;
162
163/// The inner future of AsyncIter
164#[cfg(feature = "aio")]
165struct AsyncIterInner<'a, T: FromRedisValue + 'a> {
166    batch: std::vec::IntoIter<Result<T, ParsingError>>,
167    con: &'a mut (dyn AsyncConnection + Send + 'a),
168    cmd: Cmd,
169}
170
171/// Represents the state of AsyncIter
172#[cfg(feature = "aio")]
173enum IterOrFuture<'a, T: FromRedisValue + 'a> {
174    Iter(AsyncIterInner<'a, T>),
175    Future(BoxFuture<'a, (AsyncIterInner<'a, T>, Option<RedisResult<T>>)>),
176    Empty,
177}
178
179/// Represents a redis iterator that can be used with async connections.
180#[cfg(feature = "aio")]
181pub struct AsyncIter<'a, T: FromRedisValue + 'a> {
182    inner: IterOrFuture<'a, T>,
183}
184
185#[cfg(feature = "aio")]
186impl<'a, T: FromRedisValue + 'a> AsyncIterInner<'a, T> {
187    async fn next_item(&mut self) -> Option<RedisResult<T>> {
188        // we need to do this in a loop until we produce at least one item
189        // or we find the actual end of the iteration.  This is necessary
190        // because with filtering an iterator it is possible that a whole
191        // chunk is not matching the pattern and thus yielding empty results.
192        loop {
193            if let Some(v) = self.batch.next() {
194                return Some(v.map_err(|err| err.into()));
195            };
196
197            if self.cmd.cursor? == 0 {
198                return None;
199            }
200
201            let (cursor, batch) = match self
202                .con
203                .req_packed_command(&self.cmd)
204                .await
205                .and_then(|val| Ok(from_redis_value::<(u64, _)>(val)?))
206            {
207                Ok((cursor, items)) => (cursor, T::from_each_redis_values(items)),
208                Err(e) => return Some(Err(e)),
209            };
210
211            self.cmd.cursor = Some(cursor);
212            self.batch = batch.into_iter();
213        }
214    }
215}
216
217#[cfg(feature = "aio")]
218impl<'a, T: FromRedisValue + 'a + Unpin + Send> AsyncIter<'a, T> {
219    /// ```rust,no_run
220    /// # use redis::AsyncCommands;
221    /// # async fn scan_set() -> redis::RedisResult<()> {
222    /// # let client = redis::Client::open("redis://127.0.0.1/")?;
223    /// # let mut con = client.get_multiplexed_async_connection().await?;
224    /// let _: () = con.sadd("my_set", 42i32).await?;
225    /// let _: () = con.sadd("my_set", 43i32).await?;
226    /// let mut iter: redis::AsyncIter<i32> = con.sscan("my_set").await?;
227    /// while let Some(element) = iter.next_item().await {
228    ///     let element = element?;
229    ///     assert!(element == 42 || element == 43);
230    /// }
231    /// # Ok(())
232    /// # }
233    /// ```
234    #[inline]
235    pub async fn next_item(&mut self) -> Option<RedisResult<T>> {
236        StreamExt::next(self).await
237    }
238}
239
240#[cfg(feature = "aio")]
241impl<'a, T: FromRedisValue + Unpin + Send + 'a> Stream for AsyncIter<'a, T> {
242    type Item = RedisResult<T>;
243
244    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
245        let this = self.get_mut();
246        let inner = std::mem::replace(&mut this.inner, IterOrFuture::Empty);
247        match inner {
248            IterOrFuture::Iter(mut iter) => {
249                let fut = async move {
250                    let next_item = iter.next_item().await;
251                    (iter, next_item)
252                };
253                this.inner = IterOrFuture::Future(Box::pin(fut));
254                Pin::new(this).poll_next(cx)
255            }
256            IterOrFuture::Future(mut fut) => match fut.as_mut().poll(cx) {
257                Poll::Pending => {
258                    this.inner = IterOrFuture::Future(fut);
259                    Poll::Pending
260                }
261                Poll::Ready((iter, value)) => {
262                    this.inner = IterOrFuture::Iter(iter);
263
264                    Poll::Ready(value)
265                }
266            },
267            IterOrFuture::Empty => unreachable!(),
268        }
269    }
270}
271
272fn countdigits(mut v: usize) -> usize {
273    let mut result = 1;
274    loop {
275        if v < 10 {
276            return result;
277        }
278        if v < 100 {
279            return result + 1;
280        }
281        if v < 1000 {
282            return result + 2;
283        }
284        if v < 10000 {
285            return result + 3;
286        }
287
288        v /= 10000;
289        result += 4;
290    }
291}
292
293#[inline]
294fn bulklen(len: usize) -> usize {
295    1 + countdigits(len) + 2 + len + 2
296}
297
298fn args_len<'a, I>(args: I, cursor: u64) -> usize
299where
300    I: IntoIterator<Item = Arg<&'a [u8]>> + ExactSizeIterator,
301{
302    let mut totlen = 1 + countdigits(args.len()) + 2;
303    for item in args {
304        totlen += bulklen(match item {
305            Arg::Cursor => countdigits(cursor as usize),
306            Arg::Simple(val) => val.len(),
307        });
308    }
309    totlen
310}
311
312pub(crate) fn cmd_len(cmd: &Cmd) -> usize {
313    args_len(cmd.args_iter(), cmd.cursor.unwrap_or(0))
314}
315
316fn encode_command<'a, I>(args: I, cursor: u64) -> Vec<u8>
317where
318    I: IntoIterator<Item = Arg<&'a [u8]>> + Clone + ExactSizeIterator,
319{
320    let mut cmd = Vec::new();
321    write_command_to_vec(&mut cmd, args, cursor);
322    cmd
323}
324
325fn write_command_to_vec<'a, I>(cmd: &mut Vec<u8>, args: I, cursor: u64)
326where
327    I: IntoIterator<Item = Arg<&'a [u8]>> + Clone + ExactSizeIterator,
328{
329    let totlen = args_len(args.clone(), cursor);
330
331    cmd.reserve(totlen);
332
333    write_command(cmd, args, cursor).unwrap()
334}
335
336fn write_command<'a, I>(cmd: &mut (impl ?Sized + Write), args: I, cursor: u64) -> io::Result<()>
337where
338    I: IntoIterator<Item = Arg<&'a [u8]>> + Clone + ExactSizeIterator,
339{
340    let mut buf = ::itoa::Buffer::new();
341
342    cmd.write_all(b"*")?;
343    let s = buf.format(args.len());
344    cmd.write_all(s.as_bytes())?;
345    cmd.write_all(b"\r\n")?;
346
347    let mut cursor_bytes = itoa::Buffer::new();
348    for item in args {
349        let bytes = match item {
350            Arg::Cursor => cursor_bytes.format(cursor).as_bytes(),
351            Arg::Simple(val) => val,
352        };
353
354        cmd.write_all(b"$")?;
355        let s = buf.format(bytes.len());
356        cmd.write_all(s.as_bytes())?;
357        cmd.write_all(b"\r\n")?;
358
359        cmd.write_all(bytes)?;
360        cmd.write_all(b"\r\n")?;
361    }
362    Ok(())
363}
364
365impl RedisWrite for Cmd {
366    fn write_arg(&mut self, arg: &[u8]) {
367        self.data.extend_from_slice(arg);
368        self.args.push(Arg::Simple(self.data.len()));
369    }
370
371    fn write_arg_fmt(&mut self, arg: impl fmt::Display) {
372        write!(self.data, "{arg}").unwrap();
373        self.args.push(Arg::Simple(self.data.len()));
374    }
375
376    fn writer_for_next_arg(&mut self) -> impl Write + '_ {
377        struct CmdBufferedArgGuard<'a>(&'a mut Cmd);
378        impl Drop for CmdBufferedArgGuard<'_> {
379            fn drop(&mut self) {
380                self.0.args.push(Arg::Simple(self.0.data.len()));
381            }
382        }
383        impl Write for CmdBufferedArgGuard<'_> {
384            fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
385                self.0.data.extend_from_slice(buf);
386                Ok(buf.len())
387            }
388
389            fn flush(&mut self) -> std::io::Result<()> {
390                Ok(())
391            }
392        }
393
394        CmdBufferedArgGuard(self)
395    }
396
397    fn reserve_space_for_args(&mut self, additional: impl IntoIterator<Item = usize>) {
398        let mut capacity = 0;
399        let mut args = 0;
400        for add in additional {
401            capacity += add;
402            args += 1;
403        }
404        self.data.reserve(capacity);
405        self.args.reserve(args);
406    }
407
408    #[cfg(feature = "bytes")]
409    fn bufmut_for_next_arg(&mut self, capacity: usize) -> impl bytes::BufMut + '_ {
410        self.data.reserve(capacity);
411        struct CmdBufferedArgGuard<'a>(&'a mut Cmd);
412        impl Drop for CmdBufferedArgGuard<'_> {
413            fn drop(&mut self) {
414                self.0.args.push(Arg::Simple(self.0.data.len()));
415            }
416        }
417        unsafe impl bytes::BufMut for CmdBufferedArgGuard<'_> {
418            fn remaining_mut(&self) -> usize {
419                self.0.data.remaining_mut()
420            }
421
422            unsafe fn advance_mut(&mut self, cnt: usize) {
423                unsafe {
424                    self.0.data.advance_mut(cnt);
425                }
426            }
427
428            fn chunk_mut(&mut self) -> &mut bytes::buf::UninitSlice {
429                self.0.data.chunk_mut()
430            }
431
432            // Vec specializes these methods, so we do too
433            fn put<T: bytes::buf::Buf>(&mut self, src: T)
434            where
435                Self: Sized,
436            {
437                self.0.data.put(src);
438            }
439
440            fn put_slice(&mut self, src: &[u8]) {
441                self.0.data.put_slice(src);
442            }
443
444            fn put_bytes(&mut self, val: u8, cnt: usize) {
445                self.0.data.put_bytes(val, cnt);
446            }
447        }
448
449        CmdBufferedArgGuard(self)
450    }
451}
452
453impl Default for Cmd {
454    fn default() -> Cmd {
455        Cmd::new()
456    }
457}
458
459/// A command acts as a builder interface to creating encoded redis
460/// requests.  This allows you to easily assemble a packed command
461/// by chaining arguments together.
462///
463/// Basic example:
464///
465/// ```rust
466/// redis::Cmd::new().arg("SET").arg("my_key").arg(42);
467/// ```
468///
469/// There is also a helper function called `cmd` which makes it a
470/// tiny bit shorter:
471///
472/// ```rust
473/// redis::cmd("SET").arg("my_key").arg(42);
474/// ```
475///
476/// Because Rust currently does not have an ideal system
477/// for lifetimes of temporaries, sometimes you need to hold on to
478/// the initially generated command:
479///
480/// ```rust,no_run
481/// # let client = redis::Client::open("redis://127.0.0.1/").unwrap();
482/// # let mut con = client.get_connection().unwrap();
483/// let mut cmd = redis::cmd("SMEMBERS");
484/// let mut iter : redis::Iter<i32> = cmd.arg("my_set").clone().iter(&mut con).unwrap();
485/// ```
486impl Cmd {
487    /// Creates a new empty command.
488    pub fn new() -> Cmd {
489        Cmd {
490            data: vec![],
491            args: vec![],
492            cursor: None,
493            no_response: false,
494            skip_concurrency_limit: false,
495            #[cfg(feature = "cache-aio")]
496            cache: None,
497        }
498    }
499
500    /// Creates a new empty command, with at least the requested capacity.
501    pub fn with_capacity(arg_count: usize, size_of_data: usize) -> Cmd {
502        Cmd {
503            data: Vec::with_capacity(size_of_data),
504            args: Vec::with_capacity(arg_count),
505            cursor: None,
506            no_response: false,
507            skip_concurrency_limit: false,
508            #[cfg(feature = "cache-aio")]
509            cache: None,
510        }
511    }
512
513    /// Get the capacities for the internal buffers.
514    #[cfg(test)]
515    #[allow(dead_code)]
516    pub(crate) fn capacity(&self) -> (usize, usize) {
517        (self.args.capacity(), self.data.capacity())
518    }
519
520    /// Clears the command, resetting it completely.
521    ///
522    /// This is equivalent to [`Cmd::new`], except the buffer capacity is kept.
523    ///
524    /// # Examples
525    ///
526    /// ```rust,no_run
527    /// # use redis::{Client, Cmd};
528    /// # let client = Client::open("redis://127.0.0.1/").unwrap();
529    /// # let mut con = client.get_connection().expect("Failed to connect to Redis");
530    /// let mut cmd = Cmd::new();
531    /// cmd.arg("SET").arg("foo").arg("42");
532    /// cmd.query::<()>(&mut con).expect("Query failed");
533    /// cmd.clear();
534    /// // This reuses the allocations of the previous command
535    /// cmd.arg("SET").arg("bar").arg("42");
536    /// cmd.query::<()>(&mut con).expect("Query failed");
537    /// ```
538    pub fn clear(&mut self) {
539        self.data.clear();
540        self.args.clear();
541        self.cursor = None;
542        self.no_response = false;
543        self.skip_concurrency_limit = false;
544        #[cfg(feature = "cache-aio")]
545        {
546            self.cache = None;
547        }
548    }
549
550    /// Appends an argument to the command.  The argument passed must
551    /// be a type that implements `ToRedisArgs`.  Most primitive types as
552    /// well as vectors of primitive types implement it.
553    ///
554    /// For instance all of the following are valid:
555    ///
556    /// ```rust,no_run
557    /// # let client = redis::Client::open("redis://127.0.0.1/").unwrap();
558    /// # let mut con = client.get_connection().unwrap();
559    /// redis::cmd("SET").arg(&["my_key", "my_value"]);
560    /// redis::cmd("SET").arg("my_key").arg(42);
561    /// redis::cmd("SET").arg("my_key").arg(b"my_value");
562    /// ```
563    #[inline]
564    pub fn arg<T: ToRedisArgs>(&mut self, arg: T) -> &mut Cmd {
565        arg.write_redis_args(self);
566        self
567    }
568
569    /// Takes the command out of the mutable reference and returns it as a value
570    ///
571    /// The referenced command is left empty.
572    pub fn take(&mut self) -> Self {
573        std::mem::take(self)
574    }
575
576    /// Works similar to `arg` but adds a cursor argument.
577    ///
578    /// This is always an integer and also flips the command implementation to support a
579    /// different mode for the iterators where the iterator will ask for
580    /// another batch of items when the local data is exhausted.
581    /// Calling this function more than once will overwrite the previous cursor with the latest set value.
582    ///
583    /// ```rust,no_run
584    /// # let client = redis::Client::open("redis://127.0.0.1/").unwrap();
585    /// # let mut con = client.get_connection().unwrap();
586    /// let mut cmd = redis::cmd("SSCAN");
587    /// let mut iter : redis::Iter<isize> =
588    ///     cmd.arg("my_set").cursor_arg(0).clone().iter(&mut con).unwrap();
589    /// for x in iter {
590    ///     // do something with the item
591    /// }
592    /// ```
593    #[inline]
594    pub fn cursor_arg(&mut self, cursor: u64) -> &mut Cmd {
595        self.cursor = Some(cursor);
596        self.args.push(Arg::Cursor);
597        self
598    }
599
600    /// Returns the packed command as a byte vector.
601    ///
602    /// This is a wrapper around [`write_packed_command`] that creates a [`Vec`] to write to.
603    ///
604    /// [`write_packed_command`]: Self::write_packed_command
605    #[inline]
606    pub fn get_packed_command(&self) -> Vec<u8> {
607        let mut cmd = Vec::new();
608        if self.is_empty() {
609            return cmd;
610        }
611        self.write_packed_command(&mut cmd);
612        cmd
613    }
614
615    /// Writes the packed command to `dst`.
616    ///
617    /// This will *append* the packed command.
618    ///
619    /// See also [`get_packed_command`].
620    ///
621    /// [`get_packed_command`]: Self::get_packed_command.
622    #[inline]
623    pub fn write_packed_command(&self, dst: &mut Vec<u8>) {
624        write_command_to_vec(dst, self.args_iter(), self.cursor.unwrap_or(0))
625    }
626
627    pub(crate) fn write_packed_command_preallocated(&self, cmd: &mut Vec<u8>) {
628        write_command(cmd, self.args_iter(), self.cursor.unwrap_or(0)).unwrap()
629    }
630
631    /// Returns true if the command is in scan mode.
632    #[inline]
633    pub fn in_scan_mode(&self) -> bool {
634        self.cursor.is_some()
635    }
636
637    /// Sends the command as query to the connection and converts the
638    /// result to the target redis value.  This is the general way how
639    /// you can retrieve data.
640    #[inline]
641    pub fn query<T: FromRedisValue>(&self, con: &mut dyn ConnectionLike) -> RedisResult<T> {
642        match con.req_command(self) {
643            Ok(val) => Ok(from_redis_value(val.extract_error()?)?),
644            Err(e) => Err(e),
645        }
646    }
647
648    /// Async version of `query`.
649    #[inline]
650    #[cfg(feature = "aio")]
651    pub async fn query_async<T: FromRedisValue>(
652        &self,
653        con: &mut impl crate::aio::ConnectionLike,
654    ) -> RedisResult<T> {
655        let val = con.req_packed_command(self).await?;
656        Ok(from_redis_value(val.extract_error()?)?)
657    }
658
659    /// Sets the cursor and converts the passed value to a batch used by the
660    /// iterators.
661    fn set_cursor_and_get_batch<T: FromRedisValue>(
662        &mut self,
663        value: crate::Value,
664    ) -> RedisResult<Vec<Result<T, ParsingError>>> {
665        let (cursor, values) = if value.looks_like_cursor() {
666            let (cursor, values) = from_redis_value::<(u64, _)>(value)?;
667            (cursor, values)
668        } else {
669            (0, from_redis_value(value)?)
670        };
671
672        self.cursor = Some(cursor);
673
674        Ok(T::from_each_redis_values(values))
675    }
676
677    /// Similar to `query()` but returns an iterator over the items of the
678    /// bulk result or iterator.  In normal mode this is not in any way more
679    /// efficient than just querying into a `Vec<T>` as it's internally
680    /// implemented as buffering into a vector.  This however is useful when
681    /// `cursor_arg` was used in which case the iterator will query for more
682    /// items until the server side cursor is exhausted.
683    ///
684    /// This is useful for commands such as `SSCAN`, `SCAN` and others.
685    ///
686    /// One speciality of this function is that it will check if the response
687    /// looks like a cursor or not and always just looks at the payload.
688    /// This way you can use the function the same for responses in the
689    /// format of `KEYS` (just a list) as well as `SSCAN` (which returns a
690    /// tuple of cursor and list).
691    #[inline]
692    pub fn iter<T: FromRedisValue>(
693        mut self,
694        con: &mut dyn ConnectionLike,
695    ) -> RedisResult<Iter<'_, T>> {
696        let rv = con.req_command(&self)?;
697
698        let batch = self.set_cursor_and_get_batch(rv)?;
699
700        Ok(Iter {
701            iter: CheckedIter {
702                batch: batch.into_iter(),
703                con,
704                cmd: self,
705            },
706        })
707    }
708
709    /// Similar to `iter()` but returns an AsyncIter over the items of the
710    /// bulk result or iterator.  A [futures::Stream](https://docs.rs/futures/0.3.3/futures/stream/trait.Stream.html)
711    /// is implemented on AsyncIter. In normal mode this is not in any way more
712    /// efficient than just querying into a `Vec<T>` as it's internally
713    /// implemented as buffering into a vector.  This however is useful when
714    /// `cursor_arg` was used in which case the stream will query for more
715    /// items until the server side cursor is exhausted.
716    ///
717    /// This is useful for commands such as `SSCAN`, `SCAN` and others in async contexts.
718    ///
719    /// One speciality of this function is that it will check if the response
720    /// looks like a cursor or not and always just looks at the payload.
721    /// This way you can use the function the same for responses in the
722    /// format of `KEYS` (just a list) as well as `SSCAN` (which returns a
723    /// tuple of cursor and list).
724    #[cfg(feature = "aio")]
725    #[inline]
726    pub async fn iter_async<'a, T: FromRedisValue + 'a>(
727        mut self,
728        con: &'a mut (dyn AsyncConnection + Send),
729    ) -> RedisResult<AsyncIter<'a, T>> {
730        let rv = con.req_packed_command(&self).await?;
731
732        let batch = self.set_cursor_and_get_batch(rv)?;
733
734        Ok(AsyncIter {
735            inner: IterOrFuture::Iter(AsyncIterInner {
736                batch: batch.into_iter(),
737                con,
738                cmd: self,
739            }),
740        })
741    }
742
743    /// This is an alternative to `query`` that can be used if you want to be able to handle a
744    /// command's success or failure but don't care about the command's response. For example,
745    /// this is useful for "SET" commands for which the response's content is not important.
746    /// It avoids the need to define generic bounds for ().
747    #[inline]
748    pub fn exec(&self, con: &mut dyn ConnectionLike) -> RedisResult<()> {
749        self.query::<()>(con)
750    }
751
752    /// This is an alternative to `query_async` that can be used if you want to be able to handle a
753    /// command's success or failure but don't care about the command's response. For example,
754    /// this is useful for "SET" commands for which the response's content is not important.
755    /// It avoids the need to define generic bounds for ().
756    #[cfg(feature = "aio")]
757    pub async fn exec_async(&self, con: &mut impl crate::aio::ConnectionLike) -> RedisResult<()> {
758        self.query_async::<()>(con).await
759    }
760
761    /// Returns an iterator over the arguments in this command (including the command name itself)
762    pub fn args_iter(&self) -> impl Clone + ExactSizeIterator<Item = Arg<&[u8]>> {
763        let mut prev = 0;
764        self.args.iter().map(move |arg| match *arg {
765            Arg::Simple(i) => {
766                let arg = Arg::Simple(&self.data[prev..i]);
767                prev = i;
768                arg
769            }
770
771            Arg::Cursor => Arg::Cursor,
772        })
773    }
774
775    // Get a reference to the argument at `idx`
776    #[cfg(any(feature = "cluster", feature = "cache-aio"))]
777    pub(crate) fn arg_idx(&self, idx: usize) -> Option<&[u8]> {
778        if idx >= self.args.len() {
779            return None;
780        }
781
782        let start = if idx == 0 {
783            0
784        } else {
785            match self.args[idx - 1] {
786                Arg::Simple(n) => n,
787                _ => 0,
788            }
789        };
790        let end = match self.args[idx] {
791            Arg::Simple(n) => n,
792            _ => 0,
793        };
794        if start == 0 && end == 0 {
795            return None;
796        }
797        Some(&self.data[start..end])
798    }
799
800    /// Client won't read and wait for results. Currently only used for Pub/Sub commands in RESP3.
801    ///
802    /// This is mostly set internally. The user can set it if they know that a certain command doesn't return a response, or if they use an async connection and don't want to wait for the server response.
803    /// For sync connections, setting this wrongly can affect the connection's correctness, and should be avoided.
804    #[inline]
805    pub fn set_no_response(&mut self, nr: bool) -> &mut Cmd {
806        self.no_response = nr;
807        self
808    }
809
810    /// Check whether command's result will be waited for.
811    #[inline]
812    pub fn is_no_response(&self) -> bool {
813        self.no_response
814    }
815
816    /// Changes caching behaviour for this specific command.
817    #[cfg(feature = "cache-aio")]
818    #[cfg_attr(docsrs, doc(cfg(feature = "cache-aio")))]
819    pub fn set_cache_config(&mut self, command_cache_config: CommandCacheConfig) -> &mut Cmd {
820        self.cache = Some(command_cache_config);
821        self
822    }
823
824    #[cfg(feature = "cache-aio")]
825    #[inline]
826    pub(crate) fn get_cache_config(&self) -> &Option<CommandCacheConfig> {
827        &self.cache
828    }
829
830    pub(crate) fn is_empty(&self) -> bool {
831        self.args.is_empty()
832    }
833}
834
835/// Shortcut function to creating a command with a single argument.
836///
837/// The first argument of a redis command is always the name of the command
838/// which needs to be a string.  This is the recommended way to start a
839/// command pipe.
840///
841/// ```rust
842/// redis::cmd("PING");
843/// ```
844pub fn cmd(name: &str) -> Cmd {
845    let mut rv = Cmd::new();
846    rv.arg(name);
847    rv
848}
849
850/// Packs a bunch of commands into a request.
851///
852/// This is generally a quite useless function as this functionality is
853/// nicely wrapped through the `Cmd` object, but in some cases it can be
854/// useful.  The return value of this can then be send to the low level
855/// `ConnectionLike` methods.
856///
857/// Example:
858///
859/// ```rust
860/// # use redis::ToRedisArgs;
861/// let mut args = vec![];
862/// args.extend("SET".to_redis_args());
863/// args.extend("my_key".to_redis_args());
864/// args.extend(42.to_redis_args());
865/// let cmd = redis::pack_command(&args);
866/// assert_eq!(cmd, b"*3\r\n$3\r\nSET\r\n$6\r\nmy_key\r\n$2\r\n42\r\n".to_vec());
867/// ```
868pub fn pack_command(args: &[Vec<u8>]) -> Vec<u8> {
869    encode_command(args.iter().map(|x| Arg::Simple(&x[..])), 0)
870}
871
872/// Shortcut for creating a new pipeline.
873pub fn pipe() -> Pipeline {
874    Pipeline::new()
875}
876
877#[cfg(test)]
878mod tests {
879    use super::*;
880    #[cfg(feature = "bytes")]
881    use bytes::BufMut;
882
883    fn args_iter_to_str(cmd: &Cmd) -> Vec<String> {
884        cmd.args_iter()
885            .map(|arg| match arg {
886                Arg::Simple(bytes) => String::from_utf8(bytes.to_vec()).unwrap(),
887                Arg::Cursor => "CURSOR".to_string(),
888            })
889            .collect()
890    }
891
892    fn assert_arg_equality(c1: &Cmd, c2: &Cmd) {
893        let v1: Vec<_> = c1.args_iter().collect::<Vec<_>>();
894        let v2: Vec<_> = c2.args_iter().collect::<Vec<_>>();
895        assert_eq!(
896            v1,
897            v2,
898            "{:?} - {:?}",
899            args_iter_to_str(c1),
900            args_iter_to_str(c2)
901        );
902    }
903
904    fn assert_practical_equivalent(c1: Cmd, c2: Cmd) {
905        assert_eq!(c1.get_packed_command(), c2.get_packed_command());
906        assert_arg_equality(&c1, &c2);
907    }
908
909    #[test]
910    fn test_cmd_packed_command_simple_args() {
911        let args: &[&[u8]] = &[b"phone", b"barz"];
912        let mut cmd = cmd("key");
913        cmd.write_arg_fmt("value");
914        cmd.arg(42).arg(args);
915
916        let packed_command = cmd.get_packed_command();
917        assert_eq!(cmd_len(&cmd), packed_command.len());
918        assert_eq!(
919            packed_command,
920            b"*5\r\n$3\r\nkey\r\n$5\r\nvalue\r\n$2\r\n42\r\n$5\r\nphone\r\n$4\r\nbarz\r\n",
921            "{}",
922            String::from_utf8(packed_command.clone()).unwrap()
923        );
924        let args_vec: Vec<&[u8]> = vec![b"key", b"value", b"42", b"phone", b"barz"];
925        let args_vec: Vec<_> = args_vec.into_iter().map(Arg::Simple).collect();
926        assert_eq!(cmd.args_iter().collect::<Vec<_>>(), args_vec);
927    }
928
929    #[test]
930    fn test_cmd_packed_command_with_cursor() {
931        let args: &[&[u8]] = &[b"phone", b"barz"];
932        let mut cmd = cmd("key");
933        cmd.arg("value").arg(42).arg(args).cursor_arg(512);
934
935        let packed_command = cmd.get_packed_command();
936        assert_eq!(cmd_len(&cmd), packed_command.len());
937        assert_eq!(
938            packed_command,
939            b"*6\r\n$3\r\nkey\r\n$5\r\nvalue\r\n$2\r\n42\r\n$5\r\nphone\r\n$4\r\nbarz\r\n$3\r\n512\r\n",
940            "{}",
941            String::from_utf8(packed_command.clone()).unwrap()
942        );
943        let args_vec: Vec<&[u8]> = vec![b"key", b"value", b"42", b"phone", b"barz"];
944        let args_vec: Vec<_> = args_vec
945            .into_iter()
946            .map(Arg::Simple)
947            .chain(std::iter::once(Arg::Cursor))
948            .collect();
949        assert_eq!(cmd.args_iter().collect::<Vec<_>>(), args_vec);
950    }
951
952    #[test]
953    fn test_cmd_clean() {
954        let mut cmd = cmd("key");
955        cmd.arg("value")
956            .cursor_arg(24)
957            .set_no_response(true)
958            .clear();
959
960        // Everything should be reset, but the capacity should still be there
961        assert!(cmd.data.is_empty());
962        assert!(cmd.data.capacity() > 0);
963        assert!(cmd.is_empty());
964        assert!(cmd.args.capacity() > 0);
965        assert_eq!(cmd.cursor, None);
966        assert!(!cmd.no_response);
967        assert_practical_equivalent(cmd, Cmd::new());
968    }
969
970    #[test]
971    #[cfg(feature = "cache-aio")]
972    fn test_cmd_clean_cache_aio() {
973        let mut cmd = cmd("key");
974        cmd.arg("value")
975            .cursor_arg(24)
976            .set_cache_config(crate::CommandCacheConfig::default())
977            .set_no_response(true)
978            .clear();
979
980        // Everything should be reset, but the capacity should still be there
981        assert!(cmd.data.is_empty());
982        assert!(cmd.data.capacity() > 0);
983        assert!(cmd.is_empty());
984        assert!(cmd.args.capacity() > 0);
985        assert_eq!(cmd.cursor, None);
986        assert!(!cmd.no_response);
987        assert!(cmd.cache.is_none());
988    }
989
990    #[test]
991    fn test_cmd_writer_for_next_arg() {
992        // Test that a write split across multiple calls to `write` produces the
993        // same result as a single call to `write_arg`
994        let mut c1 = Cmd::new();
995        {
996            let mut c1_writer = c1.writer_for_next_arg();
997            c1_writer.write_all(b"foo").unwrap();
998            c1_writer.write_all(b"bar").unwrap();
999            c1_writer.flush().unwrap();
1000        }
1001
1002        let mut c2 = Cmd::new();
1003        c2.write_arg(b"foobar");
1004
1005        assert_practical_equivalent(c1, c2);
1006    }
1007
1008    // Test that multiple writers to the same command produce the same
1009    // result as the same multiple calls to `write_arg`
1010    #[test]
1011    fn test_cmd_writer_for_next_arg_multiple() {
1012        let mut c1 = Cmd::new();
1013        {
1014            let mut c1_writer = c1.writer_for_next_arg();
1015            c1_writer.write_all(b"foo").unwrap();
1016            c1_writer.write_all(b"bar").unwrap();
1017            c1_writer.flush().unwrap();
1018        }
1019        {
1020            let mut c1_writer = c1.writer_for_next_arg();
1021            c1_writer.write_all(b"baz").unwrap();
1022            c1_writer.write_all(b"qux").unwrap();
1023            c1_writer.flush().unwrap();
1024        }
1025
1026        let mut c2 = Cmd::new();
1027        c2.write_arg(b"foobar");
1028        c2.write_arg(b"bazqux");
1029
1030        assert_practical_equivalent(c1, c2);
1031    }
1032
1033    // Test that an "empty" write produces the equivalent to `write_arg(b"")`
1034    #[test]
1035    fn test_cmd_writer_for_next_arg_empty() {
1036        let mut c1 = Cmd::new();
1037        {
1038            let mut c1_writer = c1.writer_for_next_arg();
1039            c1_writer.flush().unwrap();
1040        }
1041
1042        let mut c2 = Cmd::new();
1043        c2.write_arg(b"");
1044
1045        assert_practical_equivalent(c1, c2);
1046    }
1047
1048    #[cfg(feature = "bytes")]
1049    /// Test that a write split across multiple calls to `write` produces the
1050    /// same result as a single call to `write_arg`
1051    #[test]
1052    fn test_cmd_bufmut_for_next_arg() {
1053        let mut c1 = Cmd::new();
1054        {
1055            let mut c1_writer = c1.bufmut_for_next_arg(6);
1056            c1_writer.put_slice(b"foo");
1057            c1_writer.put_slice(b"bar");
1058        }
1059
1060        let mut c2 = Cmd::new();
1061        c2.write_arg(b"foobar");
1062
1063        assert_practical_equivalent(c1, c2);
1064    }
1065
1066    #[cfg(feature = "bytes")]
1067    /// Test that multiple writers to the same command produce the same
1068    /// result as the same multiple calls to `write_arg`
1069    #[test]
1070    fn test_cmd_bufmut_for_next_arg_multiple() {
1071        let mut c1 = Cmd::new();
1072        {
1073            let mut c1_writer = c1.bufmut_for_next_arg(6);
1074            c1_writer.put_slice(b"foo");
1075            c1_writer.put_slice(b"bar");
1076        }
1077        {
1078            let mut c1_writer = c1.bufmut_for_next_arg(6);
1079            c1_writer.put_slice(b"baz");
1080            c1_writer.put_slice(b"qux");
1081        }
1082
1083        let mut c2 = Cmd::new();
1084        c2.write_arg(b"foobar");
1085        c2.write_arg(b"bazqux");
1086
1087        assert_practical_equivalent(c1, c2);
1088    }
1089
1090    #[cfg(feature = "bytes")]
1091    /// Test that an "empty" write produces the equivalent to `write_arg(b"")`
1092    #[test]
1093    fn test_cmd_bufmut_for_next_arg_empty() {
1094        let mut c1 = Cmd::new();
1095        {
1096            let _c1_writer = c1.bufmut_for_next_arg(0);
1097        }
1098
1099        let mut c2 = Cmd::new();
1100        c2.write_arg(b"");
1101
1102        assert_practical_equivalent(c1, c2);
1103    }
1104
1105    #[test]
1106    #[cfg(feature = "cluster")]
1107    fn test_cmd_arg_idx() {
1108        let mut c = Cmd::new();
1109        assert_eq!(c.arg_idx(0), None);
1110
1111        c.arg("SET");
1112        assert_eq!(c.arg_idx(0), Some(&b"SET"[..]));
1113        assert_eq!(c.arg_idx(1), None);
1114
1115        c.arg("foo").arg("42");
1116        assert_eq!(c.arg_idx(1), Some(&b"foo"[..]));
1117        assert_eq!(c.arg_idx(2), Some(&b"42"[..]));
1118        assert_eq!(c.arg_idx(3), None);
1119        assert_eq!(c.arg_idx(4), None);
1120    }
1121}