redis/
pipeline.rs

1#![macro_use]
2
3#[cfg(feature = "cache-aio")]
4use crate::cmd::CommandCacheConfig;
5use crate::cmd::{cmd, cmd_len, Cmd};
6use crate::connection::ConnectionLike;
7use crate::types::{
8    from_owned_redis_value, ErrorKind, FromRedisValue, HashSet, RedisResult, ToRedisArgs, Value,
9};
10
11/// Represents a redis command pipeline.
12#[derive(Clone)]
13pub struct Pipeline {
14    pub(crate) commands: Vec<Cmd>,
15    pub(crate) transaction_mode: bool,
16    pub(crate) ignored_commands: HashSet<usize>,
17}
18
19/// A pipeline allows you to send multiple commands in one go to the
20/// redis server.  API wise it's very similar to just using a command
21/// but it allows multiple commands to be chained and some features such
22/// as iteration are not available.
23///
24/// Basic example:
25///
26/// ```rust,no_run
27/// # let client = redis::Client::open("redis://127.0.0.1/").unwrap();
28/// # let mut con = client.get_connection().unwrap();
29/// let ((k1, k2),) : ((i32, i32),) = redis::pipe()
30///     .cmd("SET").arg("key_1").arg(42).ignore()
31///     .cmd("SET").arg("key_2").arg(43).ignore()
32///     .cmd("MGET").arg(&["key_1", "key_2"]).query(&mut con).unwrap();
33/// ```
34///
35/// As you can see with `cmd` you can start a new command.  By default
36/// each command produces a value but for some you can ignore them by
37/// calling `ignore` on the command.  That way it will be skipped in the
38/// return value which is useful for `SET` commands and others, which
39/// do not have a useful return value.
40impl Pipeline {
41    /// Creates an empty pipeline.  For consistency with the `cmd`
42    /// api a `pipe` function is provided as alias.
43    pub fn new() -> Pipeline {
44        Self::with_capacity(0)
45    }
46
47    /// Creates an empty pipeline with pre-allocated capacity.
48    pub fn with_capacity(capacity: usize) -> Pipeline {
49        Pipeline {
50            commands: Vec::with_capacity(capacity),
51            transaction_mode: false,
52            ignored_commands: HashSet::new(),
53        }
54    }
55
56    /// This enables atomic mode.  In atomic mode the whole pipeline is
57    /// enclosed in `MULTI`/`EXEC`.  From the user's point of view nothing
58    /// changes however.  This is easier than using `MULTI`/`EXEC` yourself
59    /// as the format does not change.
60    ///
61    /// ```rust,no_run
62    /// # let client = redis::Client::open("redis://127.0.0.1/").unwrap();
63    /// # let mut con = client.get_connection().unwrap();
64    /// let (k1, k2) : (i32, i32) = redis::pipe()
65    ///     .atomic()
66    ///     .cmd("GET").arg("key_1")
67    ///     .cmd("GET").arg("key_2").query(&mut con).unwrap();
68    /// ```
69    #[inline]
70    pub fn atomic(&mut self) -> &mut Pipeline {
71        self.transaction_mode = true;
72        self
73    }
74
75    #[cfg(feature = "aio")]
76    pub(crate) fn is_transaction(&self) -> bool {
77        self.transaction_mode
78    }
79
80    /// Returns the encoded pipeline commands.
81    pub fn get_packed_pipeline(&self) -> Vec<u8> {
82        encode_pipeline(&self.commands, self.transaction_mode)
83    }
84
85    #[cfg(feature = "aio")]
86    pub(crate) fn write_packed_pipeline(&self, out: &mut Vec<u8>) {
87        write_pipeline(out, &self.commands, self.transaction_mode)
88    }
89
90    /// Returns the number of commands currently queued by the usr in the pipeline.
91    ///
92    /// Depending on its configuration (e.g. `atomic`), the pipeline may send more commands to the server than the returned length
93    pub fn len(&self) -> usize {
94        self.commands.len()
95    }
96
97    /// Returns `true` is the pipeline contains no elements.
98    pub fn is_empty(&self) -> bool {
99        self.commands.is_empty()
100    }
101
102    fn execute_pipelined(&self, con: &mut dyn ConnectionLike) -> RedisResult<Value> {
103        self.make_pipeline_results(con.req_packed_commands(
104            &encode_pipeline(&self.commands, false),
105            0,
106            self.commands.len(),
107        )?)
108    }
109
110    fn execute_transaction(&self, con: &mut dyn ConnectionLike) -> RedisResult<Value> {
111        let mut resp = con.req_packed_commands(
112            &encode_pipeline(&self.commands, true),
113            self.commands.len() + 1,
114            1,
115        )?;
116
117        match resp.pop() {
118            Some(Value::Nil) => Ok(Value::Nil),
119            Some(Value::Array(items)) => self.make_pipeline_results(items),
120            _ => fail!((
121                ErrorKind::ResponseError,
122                "Invalid response when parsing multi response"
123            )),
124        }
125    }
126
127    /// Executes the pipeline and fetches the return values.  Since most
128    /// pipelines return different types it's recommended to use tuple
129    /// matching to process the results:
130    ///
131    /// ```rust,no_run
132    /// # let client = redis::Client::open("redis://127.0.0.1/").unwrap();
133    /// # let mut con = client.get_connection().unwrap();
134    /// let (k1, k2) : (i32, i32) = redis::pipe()
135    ///     .cmd("SET").arg("key_1").arg(42).ignore()
136    ///     .cmd("SET").arg("key_2").arg(43).ignore()
137    ///     .cmd("GET").arg("key_1")
138    ///     .cmd("GET").arg("key_2").query(&mut con).unwrap();
139    /// ```
140    ///
141    /// NOTE: A Pipeline object may be reused after `query()` with all the commands as were inserted
142    ///       to them. In order to clear a Pipeline object with minimal memory released/allocated,
143    ///       it is necessary to call the `clear()` before inserting new commands.
144    #[inline]
145    pub fn query<T: FromRedisValue>(&self, con: &mut dyn ConnectionLike) -> RedisResult<T> {
146        if !con.supports_pipelining() {
147            fail!((
148                ErrorKind::ResponseError,
149                "This connection does not support pipelining."
150            ));
151        }
152        let value = if self.commands.is_empty() {
153            Value::Array(vec![])
154        } else if self.transaction_mode {
155            self.execute_transaction(con)?
156        } else {
157            self.execute_pipelined(con)?
158        };
159
160        from_owned_redis_value(value.extract_error()?)
161    }
162
163    #[cfg(feature = "aio")]
164    async fn execute_pipelined_async<C>(&self, con: &mut C) -> RedisResult<Value>
165    where
166        C: crate::aio::ConnectionLike,
167    {
168        let value = con
169            .req_packed_commands(self, 0, self.commands.len())
170            .await?;
171        self.make_pipeline_results(value)
172    }
173
174    #[cfg(feature = "aio")]
175    async fn execute_transaction_async<C>(&self, con: &mut C) -> RedisResult<Value>
176    where
177        C: crate::aio::ConnectionLike,
178    {
179        let mut resp = con
180            .req_packed_commands(self, self.commands.len() + 1, 1)
181            .await?;
182        match resp.pop() {
183            Some(Value::Nil) => Ok(Value::Nil),
184            Some(Value::Array(items)) => self.make_pipeline_results(items),
185            _ => Err((
186                ErrorKind::ResponseError,
187                "Invalid response when parsing multi response",
188            )
189                .into()),
190        }
191    }
192
193    /// Async version of `query`.
194    #[inline]
195    #[cfg(feature = "aio")]
196    pub async fn query_async<T: FromRedisValue>(
197        &self,
198        con: &mut impl crate::aio::ConnectionLike,
199    ) -> RedisResult<T> {
200        let value = if self.commands.is_empty() {
201            return from_owned_redis_value(Value::Array(vec![]));
202        } else if self.transaction_mode {
203            self.execute_transaction_async(con).await?
204        } else {
205            self.execute_pipelined_async(con).await?
206        };
207        from_owned_redis_value(value.extract_error()?)
208    }
209
210    /// This is a shortcut to `query()` that does not return a value and
211    /// will fail the task if the query of the pipeline fails.
212    ///
213    /// This is equivalent to a call of query like this:
214    ///
215    /// ```rust,no_run
216    /// # let client = redis::Client::open("redis://127.0.0.1/").unwrap();
217    /// # let mut con = client.get_connection().unwrap();
218    /// redis::pipe().cmd("PING").query::<()>(&mut con).unwrap();
219    /// ```
220    ///
221    /// NOTE: A Pipeline object may be reused after `query()` with all the commands as were inserted
222    ///       to them. In order to clear a Pipeline object with minimal memory released/allocated,
223    ///       it is necessary to call the `clear()` before inserting new commands.
224    #[inline]
225    #[deprecated(note = "Use Cmd::exec + unwrap, instead")]
226    pub fn execute(&self, con: &mut dyn ConnectionLike) {
227        self.exec(con).unwrap();
228    }
229
230    /// This is an alternative to `query`` that can be used if you want to be able to handle a
231    /// command's success or failure but don't care about the command's response. For example,
232    /// this is useful for "SET" commands for which the response's content is not important.
233    /// It avoids the need to define generic bounds for ().
234    #[inline]
235    pub fn exec(&self, con: &mut dyn ConnectionLike) -> RedisResult<()> {
236        self.query::<()>(con)
237    }
238
239    /// This is an alternative to `query_async` that can be used if you want to be able to handle a
240    /// command's success or failure but don't care about the command's response. For example,
241    /// this is useful for "SET" commands for which the response's content is not important.
242    /// It avoids the need to define generic bounds for ().
243    #[cfg(feature = "aio")]
244    pub async fn exec_async(&self, con: &mut impl crate::aio::ConnectionLike) -> RedisResult<()> {
245        self.query_async::<()>(con).await
246    }
247}
248
249fn encode_pipeline(cmds: &[Cmd], atomic: bool) -> Vec<u8> {
250    let mut rv = vec![];
251    write_pipeline(&mut rv, cmds, atomic);
252    rv
253}
254
255fn write_pipeline(rv: &mut Vec<u8>, cmds: &[Cmd], atomic: bool) {
256    let cmds_len = cmds.iter().map(cmd_len).sum();
257
258    if atomic {
259        let multi = cmd("MULTI");
260        let exec = cmd("EXEC");
261        rv.reserve(cmd_len(&multi) + cmd_len(&exec) + cmds_len);
262
263        multi.write_packed_command_preallocated(rv);
264        for cmd in cmds {
265            cmd.write_packed_command_preallocated(rv);
266        }
267        exec.write_packed_command_preallocated(rv);
268    } else {
269        rv.reserve(cmds_len);
270
271        for cmd in cmds {
272            cmd.write_packed_command_preallocated(rv);
273        }
274    }
275}
276
277// Macro to implement shared methods between Pipeline and ClusterPipeline
278macro_rules! implement_pipeline_commands {
279    ($struct_name:ident) => {
280        impl $struct_name {
281            /// Adds a command to the cluster pipeline.
282            #[inline]
283            pub fn add_command(&mut self, cmd: Cmd) -> &mut Self {
284                self.commands.push(cmd);
285                self
286            }
287
288            /// Starts a new command. Functions such as `arg` then become
289            /// available to add more arguments to that command.
290            #[inline]
291            pub fn cmd(&mut self, name: &str) -> &mut Self {
292                self.add_command(cmd(name))
293            }
294
295            /// Returns an iterator over all the commands currently in this pipeline
296            pub fn cmd_iter(&self) -> impl Iterator<Item = &Cmd> {
297                self.commands.iter()
298            }
299
300            /// Instructs the pipeline to ignore the return value of this command.
301            /// It will still be ensured that it is not an error, but any successful
302            /// result is just thrown away.  This makes result processing through
303            /// tuples much easier because you do not need to handle all the items
304            /// you do not care about.
305            #[inline]
306            pub fn ignore(&mut self) -> &mut Self {
307                match self.commands.len() {
308                    0 => true,
309                    x => self.ignored_commands.insert(x - 1),
310                };
311                self
312            }
313
314            /// Adds an argument to the last started command. This works similar
315            /// to the `arg` method of the `Cmd` object.
316            ///
317            /// Note that this function fails the task if executed on an empty pipeline.
318            #[inline]
319            pub fn arg<T: ToRedisArgs>(&mut self, arg: T) -> &mut Self {
320                {
321                    let cmd = self.get_last_command();
322                    cmd.arg(arg);
323                }
324                self
325            }
326
327            /// Clear a pipeline object's internal data structure.
328            ///
329            /// This allows reusing a pipeline object as a clear object while performing a minimal
330            /// amount of memory released/reallocated.
331            #[inline]
332            pub fn clear(&mut self) {
333                self.commands.clear();
334                self.ignored_commands.clear();
335            }
336
337            #[inline]
338            fn get_last_command(&mut self) -> &mut Cmd {
339                let idx = match self.commands.len() {
340                    0 => panic!("No command on stack"),
341                    x => x - 1,
342                };
343                &mut self.commands[idx]
344            }
345
346            fn make_pipeline_results(&self, resp: Vec<Value>) -> RedisResult<Value> {
347                let resp = Value::extract_error_vec(resp)?;
348
349                let mut rv = Vec::with_capacity(resp.len() - self.ignored_commands.len());
350                for (idx, result) in resp.into_iter().enumerate() {
351                    if !self.ignored_commands.contains(&idx) {
352                        rv.push(result);
353                    }
354                }
355                Ok(Value::Array(rv))
356            }
357        }
358
359        impl Default for $struct_name {
360            fn default() -> Self {
361                Self::new()
362            }
363        }
364    };
365}
366
367implement_pipeline_commands!(Pipeline);
368
369// Defines caching related functions for Pipeline, ClusterPipeline isn't supported yet.
370impl Pipeline {
371    /// Changes caching behaviour for latest command in the pipeline.
372    #[cfg(feature = "cache-aio")]
373    #[cfg_attr(docsrs, doc(cfg(feature = "cache-aio")))]
374    pub fn set_cache_config(&mut self, command_cache_config: CommandCacheConfig) -> &mut Self {
375        let cmd = self.get_last_command();
376        cmd.set_cache_config(command_cache_config);
377        self
378    }
379}