Skip to main content

redis/
cmd.rs

1#[cfg(feature = "aio")]
2use futures_util::{
3    Stream, StreamExt,
4    future::BoxFuture,
5    task::{Context, Poll},
6};
7#[cfg(feature = "aio")]
8use std::pin::Pin;
9#[cfg(feature = "cache-aio")]
10use std::time::Duration;
11use std::{fmt, io, io::Write};
12
13use crate::pipeline::Pipeline;
14use crate::types::{FromRedisValue, RedisResult, RedisWrite, ToRedisArgs, from_redis_value};
15use crate::{ParsingError, connection::ConnectionLike};
16
17/// An argument to a redis command
18#[derive(Clone, PartialEq, Debug)]
19#[non_exhaustive]
20pub enum Arg<D> {
21    /// A normal argument
22    Simple(D),
23    /// A cursor argument created from `cursor_arg()`
24    Cursor,
25}
26
27/// CommandCacheConfig is used to define caching behaviour of individual commands.
28/// # Example
29/// ```rust
30/// use std::time::Duration;
31/// use redis::{CommandCacheConfig, Cmd};
32///
33/// let ttl = Duration::from_secs(120); // 2 minutes TTL
34/// let config = CommandCacheConfig::new()
35///     .set_enable_cache(true)
36///     .set_client_side_ttl(ttl);
37/// let command = Cmd::new().arg("GET").arg("key").set_cache_config(config);
38/// ```
39#[cfg(feature = "cache-aio")]
40#[cfg_attr(docsrs, doc(cfg(feature = "cache-aio")))]
41#[derive(Clone, Debug)]
42pub struct CommandCacheConfig {
43    pub(crate) enable_cache: bool,
44    pub(crate) client_side_ttl: Option<Duration>,
45}
46
47#[cfg(feature = "cache-aio")]
48impl CommandCacheConfig {
49    /// Creates new CommandCacheConfig with enable_cache as true and without client_side_ttl.
50    pub fn new() -> Self {
51        Self {
52            enable_cache: true,
53            client_side_ttl: None,
54        }
55    }
56
57    /// Sets whether the cache should be enabled or not.
58    /// Disabling cache for specific command when using [crate::caching::CacheMode::All] will not work.
59    pub fn set_enable_cache(mut self, enable_cache: bool) -> Self {
60        self.enable_cache = enable_cache;
61        self
62    }
63
64    /// Sets custom client side time to live (TTL).
65    pub fn set_client_side_ttl(mut self, client_side_ttl: Duration) -> Self {
66        self.client_side_ttl = Some(client_side_ttl);
67        self
68    }
69}
70#[cfg(feature = "cache-aio")]
71impl Default for CommandCacheConfig {
72    fn default() -> Self {
73        Self::new()
74    }
75}
76
77/// Represents redis commands.
78#[derive(Clone)]
79pub struct Cmd {
80    pub(crate) data: Vec<u8>,
81    // Arg::Simple contains the offset that marks the end of the argument
82    args: Vec<Arg<usize>>,
83    cursor: Option<u64>,
84    // If it's true command's response won't be read from socket. Useful for Pub/Sub.
85    no_response: bool,
86    #[cfg(feature = "cache-aio")]
87    cache: Option<CommandCacheConfig>,
88}
89
90impl std::fmt::Debug for Cmd {
91    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
92        let mut debug_struct = f.debug_struct("Cmd");
93        debug_struct
94            .field("data", &String::from_utf8_lossy(&self.data).as_ref())
95            .field("args", &self.args)
96            .field("cursor", &self.cursor)
97            .field("no_response", &self.no_response);
98
99        #[cfg(feature = "cache-aio")]
100        debug_struct.field("cache", &self.cache);
101
102        debug_struct.finish()
103    }
104}
105
106/// Represents a redis iterator.
107pub struct Iter<'a, T: FromRedisValue> {
108    iter: CheckedIter<'a, T>,
109}
110impl<T: FromRedisValue> Iterator for Iter<'_, T> {
111    type Item = RedisResult<T>;
112
113    #[inline]
114    fn next(&mut self) -> Option<RedisResult<T>> {
115        self.iter.next()
116    }
117}
118
119/// Represents a safe(r) redis iterator.
120struct CheckedIter<'a, T: FromRedisValue> {
121    batch: std::vec::IntoIter<Result<T, ParsingError>>,
122    con: &'a mut (dyn ConnectionLike + 'a),
123    cmd: Cmd,
124}
125
126impl<T: FromRedisValue> Iterator for CheckedIter<'_, T> {
127    type Item = RedisResult<T>;
128
129    #[inline]
130    fn next(&mut self) -> Option<RedisResult<T>> {
131        // we need to do this in a loop until we produce at least one item
132        // or we find the actual end of the iteration.  This is necessary
133        // because with filtering an iterator it is possible that a whole
134        // chunk is not matching the pattern and thus yielding empty results.
135        loop {
136            if let Some(value) = self.batch.next() {
137                return Some(value.map_err(|err| err.into()));
138            };
139
140            if self.cmd.cursor? == 0 {
141                return None;
142            }
143
144            let (cursor, batch) = match self
145                .con
146                .req_packed_command(&self.cmd.get_packed_command())
147                .and_then(|val| Ok(from_redis_value::<(u64, _)>(val)?))
148            {
149                Ok((cursor, values)) => (cursor, T::from_each_redis_values(values)),
150                Err(e) => return Some(Err(e)),
151            };
152
153            self.cmd.cursor = Some(cursor);
154            self.batch = batch.into_iter();
155        }
156    }
157}
158
159#[cfg(feature = "aio")]
160use crate::aio::ConnectionLike as AsyncConnection;
161
162/// The inner future of AsyncIter
163#[cfg(feature = "aio")]
164struct AsyncIterInner<'a, T: FromRedisValue + 'a> {
165    batch: std::vec::IntoIter<Result<T, ParsingError>>,
166    con: &'a mut (dyn AsyncConnection + Send + 'a),
167    cmd: Cmd,
168}
169
170/// Represents the state of AsyncIter
171#[cfg(feature = "aio")]
172enum IterOrFuture<'a, T: FromRedisValue + 'a> {
173    Iter(AsyncIterInner<'a, T>),
174    Future(BoxFuture<'a, (AsyncIterInner<'a, T>, Option<RedisResult<T>>)>),
175    Empty,
176}
177
178/// Represents a redis iterator that can be used with async connections.
179#[cfg(feature = "aio")]
180pub struct AsyncIter<'a, T: FromRedisValue + 'a> {
181    inner: IterOrFuture<'a, T>,
182}
183
184#[cfg(feature = "aio")]
185impl<'a, T: FromRedisValue + 'a> AsyncIterInner<'a, T> {
186    async fn next_item(&mut self) -> Option<RedisResult<T>> {
187        // we need to do this in a loop until we produce at least one item
188        // or we find the actual end of the iteration.  This is necessary
189        // because with filtering an iterator it is possible that a whole
190        // chunk is not matching the pattern and thus yielding empty results.
191        loop {
192            if let Some(v) = self.batch.next() {
193                return Some(v.map_err(|err| err.into()));
194            };
195
196            if self.cmd.cursor? == 0 {
197                return None;
198            }
199
200            let (cursor, batch) = match self
201                .con
202                .req_packed_command(&self.cmd)
203                .await
204                .and_then(|val| Ok(from_redis_value::<(u64, _)>(val)?))
205            {
206                Ok((cursor, items)) => (cursor, T::from_each_redis_values(items)),
207                Err(e) => return Some(Err(e)),
208            };
209
210            self.cmd.cursor = Some(cursor);
211            self.batch = batch.into_iter();
212        }
213    }
214}
215
216#[cfg(feature = "aio")]
217impl<'a, T: FromRedisValue + 'a + Unpin + Send> AsyncIter<'a, T> {
218    /// ```rust,no_run
219    /// # use redis::AsyncCommands;
220    /// # async fn scan_set() -> redis::RedisResult<()> {
221    /// # let client = redis::Client::open("redis://127.0.0.1/")?;
222    /// # let mut con = client.get_multiplexed_async_connection().await?;
223    /// let _: () = con.sadd("my_set", 42i32).await?;
224    /// let _: () = con.sadd("my_set", 43i32).await?;
225    /// let mut iter: redis::AsyncIter<i32> = con.sscan("my_set").await?;
226    /// while let Some(element) = iter.next_item().await {
227    ///     let element = element?;
228    ///     assert!(element == 42 || element == 43);
229    /// }
230    /// # Ok(())
231    /// # }
232    /// ```
233    #[inline]
234    pub async fn next_item(&mut self) -> Option<RedisResult<T>> {
235        StreamExt::next(self).await
236    }
237}
238
239#[cfg(feature = "aio")]
240impl<'a, T: FromRedisValue + Unpin + Send + 'a> Stream for AsyncIter<'a, T> {
241    type Item = RedisResult<T>;
242
243    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
244        let this = self.get_mut();
245        let inner = std::mem::replace(&mut this.inner, IterOrFuture::Empty);
246        match inner {
247            IterOrFuture::Iter(mut iter) => {
248                let fut = async move {
249                    let next_item = iter.next_item().await;
250                    (iter, next_item)
251                };
252                this.inner = IterOrFuture::Future(Box::pin(fut));
253                Pin::new(this).poll_next(cx)
254            }
255            IterOrFuture::Future(mut fut) => match fut.as_mut().poll(cx) {
256                Poll::Pending => {
257                    this.inner = IterOrFuture::Future(fut);
258                    Poll::Pending
259                }
260                Poll::Ready((iter, value)) => {
261                    this.inner = IterOrFuture::Iter(iter);
262
263                    Poll::Ready(value)
264                }
265            },
266            IterOrFuture::Empty => unreachable!(),
267        }
268    }
269}
270
271fn countdigits(mut v: usize) -> usize {
272    let mut result = 1;
273    loop {
274        if v < 10 {
275            return result;
276        }
277        if v < 100 {
278            return result + 1;
279        }
280        if v < 1000 {
281            return result + 2;
282        }
283        if v < 10000 {
284            return result + 3;
285        }
286
287        v /= 10000;
288        result += 4;
289    }
290}
291
292#[inline]
293fn bulklen(len: usize) -> usize {
294    1 + countdigits(len) + 2 + len + 2
295}
296
297fn args_len<'a, I>(args: I, cursor: u64) -> usize
298where
299    I: IntoIterator<Item = Arg<&'a [u8]>> + ExactSizeIterator,
300{
301    let mut totlen = 1 + countdigits(args.len()) + 2;
302    for item in args {
303        totlen += bulklen(match item {
304            Arg::Cursor => countdigits(cursor as usize),
305            Arg::Simple(val) => val.len(),
306        });
307    }
308    totlen
309}
310
311pub(crate) fn cmd_len(cmd: &Cmd) -> usize {
312    args_len(cmd.args_iter(), cmd.cursor.unwrap_or(0))
313}
314
315fn encode_command<'a, I>(args: I, cursor: u64) -> Vec<u8>
316where
317    I: IntoIterator<Item = Arg<&'a [u8]>> + Clone + ExactSizeIterator,
318{
319    let mut cmd = Vec::new();
320    write_command_to_vec(&mut cmd, args, cursor);
321    cmd
322}
323
324fn write_command_to_vec<'a, I>(cmd: &mut Vec<u8>, args: I, cursor: u64)
325where
326    I: IntoIterator<Item = Arg<&'a [u8]>> + Clone + ExactSizeIterator,
327{
328    let totlen = args_len(args.clone(), cursor);
329
330    cmd.reserve(totlen);
331
332    write_command(cmd, args, cursor).unwrap()
333}
334
335fn write_command<'a, I>(cmd: &mut (impl ?Sized + Write), args: I, cursor: u64) -> io::Result<()>
336where
337    I: IntoIterator<Item = Arg<&'a [u8]>> + Clone + ExactSizeIterator,
338{
339    let mut buf = ::itoa::Buffer::new();
340
341    cmd.write_all(b"*")?;
342    let s = buf.format(args.len());
343    cmd.write_all(s.as_bytes())?;
344    cmd.write_all(b"\r\n")?;
345
346    let mut cursor_bytes = itoa::Buffer::new();
347    for item in args {
348        let bytes = match item {
349            Arg::Cursor => cursor_bytes.format(cursor).as_bytes(),
350            Arg::Simple(val) => val,
351        };
352
353        cmd.write_all(b"$")?;
354        let s = buf.format(bytes.len());
355        cmd.write_all(s.as_bytes())?;
356        cmd.write_all(b"\r\n")?;
357
358        cmd.write_all(bytes)?;
359        cmd.write_all(b"\r\n")?;
360    }
361    Ok(())
362}
363
364impl RedisWrite for Cmd {
365    fn write_arg(&mut self, arg: &[u8]) {
366        self.data.extend_from_slice(arg);
367        self.args.push(Arg::Simple(self.data.len()));
368    }
369
370    fn write_arg_fmt(&mut self, arg: impl fmt::Display) {
371        write!(self.data, "{arg}").unwrap();
372        self.args.push(Arg::Simple(self.data.len()));
373    }
374
375    fn writer_for_next_arg(&mut self) -> impl Write + '_ {
376        struct CmdBufferedArgGuard<'a>(&'a mut Cmd);
377        impl Drop for CmdBufferedArgGuard<'_> {
378            fn drop(&mut self) {
379                self.0.args.push(Arg::Simple(self.0.data.len()));
380            }
381        }
382        impl Write for CmdBufferedArgGuard<'_> {
383            fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
384                self.0.data.extend_from_slice(buf);
385                Ok(buf.len())
386            }
387
388            fn flush(&mut self) -> std::io::Result<()> {
389                Ok(())
390            }
391        }
392
393        CmdBufferedArgGuard(self)
394    }
395
396    fn reserve_space_for_args(&mut self, additional: impl IntoIterator<Item = usize>) {
397        let mut capacity = 0;
398        let mut args = 0;
399        for add in additional {
400            capacity += add;
401            args += 1;
402        }
403        self.data.reserve(capacity);
404        self.args.reserve(args);
405    }
406
407    #[cfg(feature = "bytes")]
408    fn bufmut_for_next_arg(&mut self, capacity: usize) -> impl bytes::BufMut + '_ {
409        self.data.reserve(capacity);
410        struct CmdBufferedArgGuard<'a>(&'a mut Cmd);
411        impl Drop for CmdBufferedArgGuard<'_> {
412            fn drop(&mut self) {
413                self.0.args.push(Arg::Simple(self.0.data.len()));
414            }
415        }
416        unsafe impl bytes::BufMut for CmdBufferedArgGuard<'_> {
417            fn remaining_mut(&self) -> usize {
418                self.0.data.remaining_mut()
419            }
420
421            unsafe fn advance_mut(&mut self, cnt: usize) {
422                unsafe {
423                    self.0.data.advance_mut(cnt);
424                }
425            }
426
427            fn chunk_mut(&mut self) -> &mut bytes::buf::UninitSlice {
428                self.0.data.chunk_mut()
429            }
430
431            // Vec specializes these methods, so we do too
432            fn put<T: bytes::buf::Buf>(&mut self, src: T)
433            where
434                Self: Sized,
435            {
436                self.0.data.put(src);
437            }
438
439            fn put_slice(&mut self, src: &[u8]) {
440                self.0.data.put_slice(src);
441            }
442
443            fn put_bytes(&mut self, val: u8, cnt: usize) {
444                self.0.data.put_bytes(val, cnt);
445            }
446        }
447
448        CmdBufferedArgGuard(self)
449    }
450}
451
452impl Default for Cmd {
453    fn default() -> Cmd {
454        Cmd::new()
455    }
456}
457
458/// A command acts as a builder interface to creating encoded redis
459/// requests.  This allows you to easily assemble a packed command
460/// by chaining arguments together.
461///
462/// Basic example:
463///
464/// ```rust
465/// redis::Cmd::new().arg("SET").arg("my_key").arg(42);
466/// ```
467///
468/// There is also a helper function called `cmd` which makes it a
469/// tiny bit shorter:
470///
471/// ```rust
472/// redis::cmd("SET").arg("my_key").arg(42);
473/// ```
474///
475/// Because Rust currently does not have an ideal system
476/// for lifetimes of temporaries, sometimes you need to hold on to
477/// the initially generated command:
478///
479/// ```rust,no_run
480/// # let client = redis::Client::open("redis://127.0.0.1/").unwrap();
481/// # let mut con = client.get_connection().unwrap();
482/// let mut cmd = redis::cmd("SMEMBERS");
483/// let mut iter : redis::Iter<i32> = cmd.arg("my_set").clone().iter(&mut con).unwrap();
484/// ```
485impl Cmd {
486    /// Creates a new empty command.
487    pub fn new() -> Cmd {
488        Cmd {
489            data: vec![],
490            args: vec![],
491            cursor: None,
492            no_response: false,
493            #[cfg(feature = "cache-aio")]
494            cache: None,
495        }
496    }
497
498    /// Creates a new empty command, with at least the requested capacity.
499    pub fn with_capacity(arg_count: usize, size_of_data: usize) -> Cmd {
500        Cmd {
501            data: Vec::with_capacity(size_of_data),
502            args: Vec::with_capacity(arg_count),
503            cursor: None,
504            no_response: false,
505            #[cfg(feature = "cache-aio")]
506            cache: None,
507        }
508    }
509
510    /// Get the capacities for the internal buffers.
511    #[cfg(test)]
512    #[allow(dead_code)]
513    pub(crate) fn capacity(&self) -> (usize, usize) {
514        (self.args.capacity(), self.data.capacity())
515    }
516
517    /// Clears the command, resetting it completely.
518    ///
519    /// This is equivalent to [`Cmd::new`], except the buffer capacity is kept.
520    ///
521    /// # Examples
522    ///
523    /// ```rust,no_run
524    /// # use redis::{Client, Cmd};
525    /// # let client = Client::open("redis://127.0.0.1/").unwrap();
526    /// # let mut con = client.get_connection().expect("Failed to connect to Redis");
527    /// let mut cmd = Cmd::new();
528    /// cmd.arg("SET").arg("foo").arg("42");
529    /// cmd.query::<()>(&mut con).expect("Query failed");
530    /// cmd.clear();
531    /// // This reuses the allocations of the previous command
532    /// cmd.arg("SET").arg("bar").arg("42");
533    /// cmd.query::<()>(&mut con).expect("Query failed");
534    /// ```
535    pub fn clear(&mut self) {
536        self.data.clear();
537        self.args.clear();
538        self.cursor = None;
539        self.no_response = false;
540        #[cfg(feature = "cache-aio")]
541        {
542            self.cache = None;
543        }
544    }
545
546    /// Appends an argument to the command.  The argument passed must
547    /// be a type that implements `ToRedisArgs`.  Most primitive types as
548    /// well as vectors of primitive types implement it.
549    ///
550    /// For instance all of the following are valid:
551    ///
552    /// ```rust,no_run
553    /// # let client = redis::Client::open("redis://127.0.0.1/").unwrap();
554    /// # let mut con = client.get_connection().unwrap();
555    /// redis::cmd("SET").arg(&["my_key", "my_value"]);
556    /// redis::cmd("SET").arg("my_key").arg(42);
557    /// redis::cmd("SET").arg("my_key").arg(b"my_value");
558    /// ```
559    #[inline]
560    pub fn arg<T: ToRedisArgs>(&mut self, arg: T) -> &mut Cmd {
561        arg.write_redis_args(self);
562        self
563    }
564
565    /// Takes the command out of the mutable reference and returns it as a value
566    ///
567    /// The referenced command is left empty.
568    pub fn take(&mut self) -> Self {
569        std::mem::take(self)
570    }
571
572    /// Works similar to `arg` but adds a cursor argument.
573    ///
574    /// This is always an integer and also flips the command implementation to support a
575    /// different mode for the iterators where the iterator will ask for
576    /// another batch of items when the local data is exhausted.
577    /// Calling this function more than once will overwrite the previous cursor with the latest set value.
578    ///
579    /// ```rust,no_run
580    /// # let client = redis::Client::open("redis://127.0.0.1/").unwrap();
581    /// # let mut con = client.get_connection().unwrap();
582    /// let mut cmd = redis::cmd("SSCAN");
583    /// let mut iter : redis::Iter<isize> =
584    ///     cmd.arg("my_set").cursor_arg(0).clone().iter(&mut con).unwrap();
585    /// for x in iter {
586    ///     // do something with the item
587    /// }
588    /// ```
589    #[inline]
590    pub fn cursor_arg(&mut self, cursor: u64) -> &mut Cmd {
591        self.cursor = Some(cursor);
592        self.args.push(Arg::Cursor);
593        self
594    }
595
596    /// Returns the packed command as a byte vector.
597    ///
598    /// This is a wrapper around [`write_packed_command`] that creates a [`Vec`] to write to.
599    ///
600    /// [`write_packed_command`]: Self::write_packed_command
601    #[inline]
602    pub fn get_packed_command(&self) -> Vec<u8> {
603        let mut cmd = Vec::new();
604        if self.is_empty() {
605            return cmd;
606        }
607        self.write_packed_command(&mut cmd);
608        cmd
609    }
610
611    /// Writes the packed command to `dst`.
612    ///
613    /// This will *append* the packed command.
614    ///
615    /// See also [`get_packed_command`].
616    ///
617    /// [`get_packed_command`]: Self::get_packed_command.
618    #[inline]
619    pub fn write_packed_command(&self, dst: &mut Vec<u8>) {
620        write_command_to_vec(dst, self.args_iter(), self.cursor.unwrap_or(0))
621    }
622
623    pub(crate) fn write_packed_command_preallocated(&self, cmd: &mut Vec<u8>) {
624        write_command(cmd, self.args_iter(), self.cursor.unwrap_or(0)).unwrap()
625    }
626
627    /// Returns true if the command is in scan mode.
628    #[inline]
629    pub fn in_scan_mode(&self) -> bool {
630        self.cursor.is_some()
631    }
632
633    /// Sends the command as query to the connection and converts the
634    /// result to the target redis value.  This is the general way how
635    /// you can retrieve data.
636    #[inline]
637    pub fn query<T: FromRedisValue>(&self, con: &mut dyn ConnectionLike) -> RedisResult<T> {
638        match con.req_command(self) {
639            Ok(val) => Ok(from_redis_value(val.extract_error()?)?),
640            Err(e) => Err(e),
641        }
642    }
643
644    /// Async version of `query`.
645    #[inline]
646    #[cfg(feature = "aio")]
647    pub async fn query_async<T: FromRedisValue>(
648        &self,
649        con: &mut impl crate::aio::ConnectionLike,
650    ) -> RedisResult<T> {
651        let val = con.req_packed_command(self).await?;
652        Ok(from_redis_value(val.extract_error()?)?)
653    }
654
655    /// Sets the cursor and converts the passed value to a batch used by the
656    /// iterators.
657    fn set_cursor_and_get_batch<T: FromRedisValue>(
658        &mut self,
659        value: crate::Value,
660    ) -> RedisResult<Vec<Result<T, ParsingError>>> {
661        let (cursor, values) = if value.looks_like_cursor() {
662            let (cursor, values) = from_redis_value::<(u64, _)>(value)?;
663            (cursor, values)
664        } else {
665            (0, from_redis_value(value)?)
666        };
667
668        self.cursor = Some(cursor);
669
670        Ok(T::from_each_redis_values(values))
671    }
672
673    /// Similar to `query()` but returns an iterator over the items of the
674    /// bulk result or iterator.  In normal mode this is not in any way more
675    /// efficient than just querying into a `Vec<T>` as it's internally
676    /// implemented as buffering into a vector.  This however is useful when
677    /// `cursor_arg` was used in which case the iterator will query for more
678    /// items until the server side cursor is exhausted.
679    ///
680    /// This is useful for commands such as `SSCAN`, `SCAN` and others.
681    ///
682    /// One speciality of this function is that it will check if the response
683    /// looks like a cursor or not and always just looks at the payload.
684    /// This way you can use the function the same for responses in the
685    /// format of `KEYS` (just a list) as well as `SSCAN` (which returns a
686    /// tuple of cursor and list).
687    #[inline]
688    pub fn iter<T: FromRedisValue>(
689        mut self,
690        con: &mut dyn ConnectionLike,
691    ) -> RedisResult<Iter<'_, T>> {
692        let rv = con.req_command(&self)?;
693
694        let batch = self.set_cursor_and_get_batch(rv)?;
695
696        Ok(Iter {
697            iter: CheckedIter {
698                batch: batch.into_iter(),
699                con,
700                cmd: self,
701            },
702        })
703    }
704
705    /// Similar to `iter()` but returns an AsyncIter over the items of the
706    /// bulk result or iterator.  A [futures::Stream](https://docs.rs/futures/0.3.3/futures/stream/trait.Stream.html)
707    /// is implemented on AsyncIter. In normal mode this is not in any way more
708    /// efficient than just querying into a `Vec<T>` as it's internally
709    /// implemented as buffering into a vector.  This however is useful when
710    /// `cursor_arg` was used in which case the stream will query for more
711    /// items until the server side cursor is exhausted.
712    ///
713    /// This is useful for commands such as `SSCAN`, `SCAN` and others in async contexts.
714    ///
715    /// One speciality of this function is that it will check if the response
716    /// looks like a cursor or not and always just looks at the payload.
717    /// This way you can use the function the same for responses in the
718    /// format of `KEYS` (just a list) as well as `SSCAN` (which returns a
719    /// tuple of cursor and list).
720    #[cfg(feature = "aio")]
721    #[inline]
722    pub async fn iter_async<'a, T: FromRedisValue + 'a>(
723        mut self,
724        con: &'a mut (dyn AsyncConnection + Send),
725    ) -> RedisResult<AsyncIter<'a, T>> {
726        let rv = con.req_packed_command(&self).await?;
727
728        let batch = self.set_cursor_and_get_batch(rv)?;
729
730        Ok(AsyncIter {
731            inner: IterOrFuture::Iter(AsyncIterInner {
732                batch: batch.into_iter(),
733                con,
734                cmd: self,
735            }),
736        })
737    }
738
739    /// This is an alternative to `query`` that can be used if you want to be able to handle a
740    /// command's success or failure but don't care about the command's response. For example,
741    /// this is useful for "SET" commands for which the response's content is not important.
742    /// It avoids the need to define generic bounds for ().
743    #[inline]
744    pub fn exec(&self, con: &mut dyn ConnectionLike) -> RedisResult<()> {
745        self.query::<()>(con)
746    }
747
748    /// This is an alternative to `query_async` that can be used if you want to be able to handle a
749    /// command's success or failure but don't care about the command's response. For example,
750    /// this is useful for "SET" commands for which the response's content is not important.
751    /// It avoids the need to define generic bounds for ().
752    #[cfg(feature = "aio")]
753    pub async fn exec_async(&self, con: &mut impl crate::aio::ConnectionLike) -> RedisResult<()> {
754        self.query_async::<()>(con).await
755    }
756
757    /// Returns an iterator over the arguments in this command (including the command name itself)
758    pub fn args_iter(&self) -> impl Clone + ExactSizeIterator<Item = Arg<&[u8]>> {
759        let mut prev = 0;
760        self.args.iter().map(move |arg| match *arg {
761            Arg::Simple(i) => {
762                let arg = Arg::Simple(&self.data[prev..i]);
763                prev = i;
764                arg
765            }
766
767            Arg::Cursor => Arg::Cursor,
768        })
769    }
770
771    // Get a reference to the argument at `idx`
772    #[cfg(any(feature = "cluster", feature = "cache-aio"))]
773    pub(crate) fn arg_idx(&self, idx: usize) -> Option<&[u8]> {
774        if idx >= self.args.len() {
775            return None;
776        }
777
778        let start = if idx == 0 {
779            0
780        } else {
781            match self.args[idx - 1] {
782                Arg::Simple(n) => n,
783                _ => 0,
784            }
785        };
786        let end = match self.args[idx] {
787            Arg::Simple(n) => n,
788            _ => 0,
789        };
790        if start == 0 && end == 0 {
791            return None;
792        }
793        Some(&self.data[start..end])
794    }
795
796    /// Client won't read and wait for results. Currently only used for Pub/Sub commands in RESP3.
797    ///
798    /// This is mostly set internally. The user can set it if they know that a certain command doesn't return a response, or if they use an async connection and don't want to wait for the server response.
799    /// For sync connections, setting this wrongly can affect the connection's correctness, and should be avoided.
800    #[inline]
801    pub fn set_no_response(&mut self, nr: bool) -> &mut Cmd {
802        self.no_response = nr;
803        self
804    }
805
806    /// Check whether command's result will be waited for.
807    #[inline]
808    pub fn is_no_response(&self) -> bool {
809        self.no_response
810    }
811
812    /// Changes caching behaviour for this specific command.
813    #[cfg(feature = "cache-aio")]
814    #[cfg_attr(docsrs, doc(cfg(feature = "cache-aio")))]
815    pub fn set_cache_config(&mut self, command_cache_config: CommandCacheConfig) -> &mut Cmd {
816        self.cache = Some(command_cache_config);
817        self
818    }
819
820    #[cfg(feature = "cache-aio")]
821    #[inline]
822    pub(crate) fn get_cache_config(&self) -> &Option<CommandCacheConfig> {
823        &self.cache
824    }
825
826    pub(crate) fn is_empty(&self) -> bool {
827        self.args.is_empty()
828    }
829}
830
831/// Shortcut function to creating a command with a single argument.
832///
833/// The first argument of a redis command is always the name of the command
834/// which needs to be a string.  This is the recommended way to start a
835/// command pipe.
836///
837/// ```rust
838/// redis::cmd("PING");
839/// ```
840pub fn cmd(name: &str) -> Cmd {
841    let mut rv = Cmd::new();
842    rv.arg(name);
843    rv
844}
845
846/// Packs a bunch of commands into a request.
847///
848/// This is generally a quite useless function as this functionality is
849/// nicely wrapped through the `Cmd` object, but in some cases it can be
850/// useful.  The return value of this can then be send to the low level
851/// `ConnectionLike` methods.
852///
853/// Example:
854///
855/// ```rust
856/// # use redis::ToRedisArgs;
857/// let mut args = vec![];
858/// args.extend("SET".to_redis_args());
859/// args.extend("my_key".to_redis_args());
860/// args.extend(42.to_redis_args());
861/// let cmd = redis::pack_command(&args);
862/// assert_eq!(cmd, b"*3\r\n$3\r\nSET\r\n$6\r\nmy_key\r\n$2\r\n42\r\n".to_vec());
863/// ```
864pub fn pack_command(args: &[Vec<u8>]) -> Vec<u8> {
865    encode_command(args.iter().map(|x| Arg::Simple(&x[..])), 0)
866}
867
868/// Shortcut for creating a new pipeline.
869pub fn pipe() -> Pipeline {
870    Pipeline::new()
871}
872
873#[cfg(test)]
874mod tests {
875    use super::*;
876    #[cfg(feature = "bytes")]
877    use bytes::BufMut;
878
879    fn args_iter_to_str(cmd: &Cmd) -> Vec<String> {
880        cmd.args_iter()
881            .map(|arg| match arg {
882                Arg::Simple(bytes) => String::from_utf8(bytes.to_vec()).unwrap(),
883                Arg::Cursor => "CURSOR".to_string(),
884            })
885            .collect()
886    }
887
888    fn assert_arg_equality(c1: &Cmd, c2: &Cmd) {
889        let v1: Vec<_> = c1.args_iter().collect::<Vec<_>>();
890        let v2: Vec<_> = c2.args_iter().collect::<Vec<_>>();
891        assert_eq!(
892            v1,
893            v2,
894            "{:?} - {:?}",
895            args_iter_to_str(c1),
896            args_iter_to_str(c2)
897        );
898    }
899
900    fn assert_practical_equivalent(c1: Cmd, c2: Cmd) {
901        assert_eq!(c1.get_packed_command(), c2.get_packed_command());
902        assert_arg_equality(&c1, &c2);
903    }
904
905    #[test]
906    fn test_cmd_packed_command_simple_args() {
907        let args: &[&[u8]] = &[b"phone", b"barz"];
908        let mut cmd = cmd("key");
909        cmd.write_arg_fmt("value");
910        cmd.arg(42).arg(args);
911
912        let packed_command = cmd.get_packed_command();
913        assert_eq!(cmd_len(&cmd), packed_command.len());
914        assert_eq!(
915            packed_command,
916            b"*5\r\n$3\r\nkey\r\n$5\r\nvalue\r\n$2\r\n42\r\n$5\r\nphone\r\n$4\r\nbarz\r\n",
917            "{}",
918            String::from_utf8(packed_command.clone()).unwrap()
919        );
920        let args_vec: Vec<&[u8]> = vec![b"key", b"value", b"42", b"phone", b"barz"];
921        let args_vec: Vec<_> = args_vec.into_iter().map(Arg::Simple).collect();
922        assert_eq!(cmd.args_iter().collect::<Vec<_>>(), args_vec);
923    }
924
925    #[test]
926    fn test_cmd_packed_command_with_cursor() {
927        let args: &[&[u8]] = &[b"phone", b"barz"];
928        let mut cmd = cmd("key");
929        cmd.arg("value").arg(42).arg(args).cursor_arg(512);
930
931        let packed_command = cmd.get_packed_command();
932        assert_eq!(cmd_len(&cmd), packed_command.len());
933        assert_eq!(
934            packed_command,
935            b"*6\r\n$3\r\nkey\r\n$5\r\nvalue\r\n$2\r\n42\r\n$5\r\nphone\r\n$4\r\nbarz\r\n$3\r\n512\r\n",
936            "{}",
937            String::from_utf8(packed_command.clone()).unwrap()
938        );
939        let args_vec: Vec<&[u8]> = vec![b"key", b"value", b"42", b"phone", b"barz"];
940        let args_vec: Vec<_> = args_vec
941            .into_iter()
942            .map(Arg::Simple)
943            .chain(std::iter::once(Arg::Cursor))
944            .collect();
945        assert_eq!(cmd.args_iter().collect::<Vec<_>>(), args_vec);
946    }
947
948    #[test]
949    fn test_cmd_clean() {
950        let mut cmd = cmd("key");
951        cmd.arg("value")
952            .cursor_arg(24)
953            .set_no_response(true)
954            .clear();
955
956        // Everything should be reset, but the capacity should still be there
957        assert!(cmd.data.is_empty());
958        assert!(cmd.data.capacity() > 0);
959        assert!(cmd.is_empty());
960        assert!(cmd.args.capacity() > 0);
961        assert_eq!(cmd.cursor, None);
962        assert!(!cmd.no_response);
963        assert_practical_equivalent(cmd, Cmd::new());
964    }
965
966    #[test]
967    #[cfg(feature = "cache-aio")]
968    fn test_cmd_clean_cache_aio() {
969        let mut cmd = cmd("key");
970        cmd.arg("value")
971            .cursor_arg(24)
972            .set_cache_config(crate::CommandCacheConfig::default())
973            .set_no_response(true)
974            .clear();
975
976        // Everything should be reset, but the capacity should still be there
977        assert!(cmd.data.is_empty());
978        assert!(cmd.data.capacity() > 0);
979        assert!(cmd.is_empty());
980        assert!(cmd.args.capacity() > 0);
981        assert_eq!(cmd.cursor, None);
982        assert!(!cmd.no_response);
983        assert!(cmd.cache.is_none());
984    }
985
986    #[test]
987    fn test_cmd_writer_for_next_arg() {
988        // Test that a write split across multiple calls to `write` produces the
989        // same result as a single call to `write_arg`
990        let mut c1 = Cmd::new();
991        {
992            let mut c1_writer = c1.writer_for_next_arg();
993            c1_writer.write_all(b"foo").unwrap();
994            c1_writer.write_all(b"bar").unwrap();
995            c1_writer.flush().unwrap();
996        }
997
998        let mut c2 = Cmd::new();
999        c2.write_arg(b"foobar");
1000
1001        assert_practical_equivalent(c1, c2);
1002    }
1003
1004    // Test that multiple writers to the same command produce the same
1005    // result as the same multiple calls to `write_arg`
1006    #[test]
1007    fn test_cmd_writer_for_next_arg_multiple() {
1008        let mut c1 = Cmd::new();
1009        {
1010            let mut c1_writer = c1.writer_for_next_arg();
1011            c1_writer.write_all(b"foo").unwrap();
1012            c1_writer.write_all(b"bar").unwrap();
1013            c1_writer.flush().unwrap();
1014        }
1015        {
1016            let mut c1_writer = c1.writer_for_next_arg();
1017            c1_writer.write_all(b"baz").unwrap();
1018            c1_writer.write_all(b"qux").unwrap();
1019            c1_writer.flush().unwrap();
1020        }
1021
1022        let mut c2 = Cmd::new();
1023        c2.write_arg(b"foobar");
1024        c2.write_arg(b"bazqux");
1025
1026        assert_practical_equivalent(c1, c2);
1027    }
1028
1029    // Test that an "empty" write produces the equivalent to `write_arg(b"")`
1030    #[test]
1031    fn test_cmd_writer_for_next_arg_empty() {
1032        let mut c1 = Cmd::new();
1033        {
1034            let mut c1_writer = c1.writer_for_next_arg();
1035            c1_writer.flush().unwrap();
1036        }
1037
1038        let mut c2 = Cmd::new();
1039        c2.write_arg(b"");
1040
1041        assert_practical_equivalent(c1, c2);
1042    }
1043
1044    #[cfg(feature = "bytes")]
1045    /// Test that a write split across multiple calls to `write` produces the
1046    /// same result as a single call to `write_arg`
1047    #[test]
1048    fn test_cmd_bufmut_for_next_arg() {
1049        let mut c1 = Cmd::new();
1050        {
1051            let mut c1_writer = c1.bufmut_for_next_arg(6);
1052            c1_writer.put_slice(b"foo");
1053            c1_writer.put_slice(b"bar");
1054        }
1055
1056        let mut c2 = Cmd::new();
1057        c2.write_arg(b"foobar");
1058
1059        assert_practical_equivalent(c1, c2);
1060    }
1061
1062    #[cfg(feature = "bytes")]
1063    /// Test that multiple writers to the same command produce the same
1064    /// result as the same multiple calls to `write_arg`
1065    #[test]
1066    fn test_cmd_bufmut_for_next_arg_multiple() {
1067        let mut c1 = Cmd::new();
1068        {
1069            let mut c1_writer = c1.bufmut_for_next_arg(6);
1070            c1_writer.put_slice(b"foo");
1071            c1_writer.put_slice(b"bar");
1072        }
1073        {
1074            let mut c1_writer = c1.bufmut_for_next_arg(6);
1075            c1_writer.put_slice(b"baz");
1076            c1_writer.put_slice(b"qux");
1077        }
1078
1079        let mut c2 = Cmd::new();
1080        c2.write_arg(b"foobar");
1081        c2.write_arg(b"bazqux");
1082
1083        assert_practical_equivalent(c1, c2);
1084    }
1085
1086    #[cfg(feature = "bytes")]
1087    /// Test that an "empty" write produces the equivalent to `write_arg(b"")`
1088    #[test]
1089    fn test_cmd_bufmut_for_next_arg_empty() {
1090        let mut c1 = Cmd::new();
1091        {
1092            let _c1_writer = c1.bufmut_for_next_arg(0);
1093        }
1094
1095        let mut c2 = Cmd::new();
1096        c2.write_arg(b"");
1097
1098        assert_practical_equivalent(c1, c2);
1099    }
1100
1101    #[test]
1102    #[cfg(feature = "cluster")]
1103    fn test_cmd_arg_idx() {
1104        let mut c = Cmd::new();
1105        assert_eq!(c.arg_idx(0), None);
1106
1107        c.arg("SET");
1108        assert_eq!(c.arg_idx(0), Some(&b"SET"[..]));
1109        assert_eq!(c.arg_idx(1), None);
1110
1111        c.arg("foo").arg("42");
1112        assert_eq!(c.arg_idx(1), Some(&b"foo"[..]));
1113        assert_eq!(c.arg_idx(2), Some(&b"42"[..]));
1114        assert_eq!(c.arg_idx(3), None);
1115        assert_eq!(c.arg_idx(4), None);
1116    }
1117}