redis/
pipeline.rs

1#![macro_use]
2
3#[cfg(feature = "cache-aio")]
4use crate::cmd::CommandCacheConfig;
5use crate::cmd::{cmd, cmd_len, Cmd};
6use crate::connection::ConnectionLike;
7use crate::types::{
8    from_owned_redis_value, ErrorKind, FromRedisValue, HashSet, RedisResult, ToRedisArgs, Value,
9};
10
11/// Represents a redis command pipeline.
12#[derive(Clone)]
13pub struct Pipeline {
14    pub(crate) commands: Vec<Cmd>,
15    pub(crate) transaction_mode: bool,
16    pub(crate) ignored_commands: HashSet<usize>,
17}
18
19/// A pipeline allows you to send multiple commands in one go to the
20/// redis server.  API wise it's very similar to just using a command
21/// but it allows multiple commands to be chained and some features such
22/// as iteration are not available.
23///
24/// Basic example:
25///
26/// ```rust,no_run
27/// # let client = redis::Client::open("redis://127.0.0.1/").unwrap();
28/// # let mut con = client.get_connection().unwrap();
29/// let ((k1, k2),) : ((i32, i32),) = redis::pipe()
30///     .cmd("SET").arg("key_1").arg(42).ignore()
31///     .cmd("SET").arg("key_2").arg(43).ignore()
32///     .cmd("MGET").arg(&["key_1", "key_2"]).query(&mut con).unwrap();
33/// ```
34///
35/// As you can see with `cmd` you can start a new command.  By default
36/// each command produces a value but for some you can ignore them by
37/// calling `ignore` on the command.  That way it will be skipped in the
38/// return value which is useful for `SET` commands and others, which
39/// do not have a useful return value.
40impl Pipeline {
41    /// Creates an empty pipeline.  For consistency with the `cmd`
42    /// api a `pipe` function is provided as alias.
43    pub fn new() -> Pipeline {
44        Self::with_capacity(0)
45    }
46
47    /// Creates an empty pipeline with pre-allocated capacity.
48    pub fn with_capacity(capacity: usize) -> Pipeline {
49        Pipeline {
50            commands: Vec::with_capacity(capacity),
51            transaction_mode: false,
52            ignored_commands: HashSet::new(),
53        }
54    }
55
56    /// This enables atomic mode.  In atomic mode the whole pipeline is
57    /// enclosed in `MULTI`/`EXEC`.  From the user's point of view nothing
58    /// changes however.  This is easier than using `MULTI`/`EXEC` yourself
59    /// as the format does not change.
60    ///
61    /// ```rust,no_run
62    /// # let client = redis::Client::open("redis://127.0.0.1/").unwrap();
63    /// # let mut con = client.get_connection().unwrap();
64    /// let (k1, k2) : (i32, i32) = redis::pipe()
65    ///     .atomic()
66    ///     .cmd("GET").arg("key_1")
67    ///     .cmd("GET").arg("key_2").query(&mut con).unwrap();
68    /// ```
69    #[inline]
70    pub fn atomic(&mut self) -> &mut Pipeline {
71        self.transaction_mode = true;
72        self
73    }
74
75    /// Returns `true` if the pipeline is in transaction mode (aka atomic mode).
76    #[cfg(feature = "aio")]
77    pub fn is_transaction(&self) -> bool {
78        self.transaction_mode
79    }
80
81    /// Returns the encoded pipeline commands.
82    pub fn get_packed_pipeline(&self) -> Vec<u8> {
83        encode_pipeline(&self.commands, self.transaction_mode)
84    }
85
86    /// Returns the number of commands currently queued by the usr in the pipeline.
87    ///
88    /// Depending on its configuration (e.g. `atomic`), the pipeline may send more commands to the server than the returned length
89    pub fn len(&self) -> usize {
90        self.commands.len()
91    }
92
93    /// Returns `true` is the pipeline contains no elements.
94    pub fn is_empty(&self) -> bool {
95        self.commands.is_empty()
96    }
97
98    fn execute_pipelined(&self, con: &mut dyn ConnectionLike) -> RedisResult<Value> {
99        self.make_pipeline_results(con.req_packed_commands(
100            &encode_pipeline(&self.commands, false),
101            0,
102            self.commands.len(),
103        )?)
104    }
105
106    fn execute_transaction(&self, con: &mut dyn ConnectionLike) -> RedisResult<Value> {
107        let mut resp = con.req_packed_commands(
108            &encode_pipeline(&self.commands, true),
109            self.commands.len() + 1,
110            1,
111        )?;
112
113        match resp.pop() {
114            Some(Value::Nil) => Ok(Value::Nil),
115            Some(Value::Array(items)) => self.make_pipeline_results(items),
116            _ => fail!((
117                ErrorKind::ResponseError,
118                "Invalid response when parsing multi response"
119            )),
120        }
121    }
122
123    /// Executes the pipeline and fetches the return values.  Since most
124    /// pipelines return different types it's recommended to use tuple
125    /// matching to process the results:
126    ///
127    /// ```rust,no_run
128    /// # let client = redis::Client::open("redis://127.0.0.1/").unwrap();
129    /// # let mut con = client.get_connection().unwrap();
130    /// let (k1, k2) : (i32, i32) = redis::pipe()
131    ///     .cmd("SET").arg("key_1").arg(42).ignore()
132    ///     .cmd("SET").arg("key_2").arg(43).ignore()
133    ///     .cmd("GET").arg("key_1")
134    ///     .cmd("GET").arg("key_2").query(&mut con).unwrap();
135    /// ```
136    ///
137    /// NOTE: A Pipeline object may be reused after `query()` with all the commands as were inserted
138    ///       to them. In order to clear a Pipeline object with minimal memory released/allocated,
139    ///       it is necessary to call the `clear()` before inserting new commands.
140    #[inline]
141    pub fn query<T: FromRedisValue>(&self, con: &mut dyn ConnectionLike) -> RedisResult<T> {
142        if !con.supports_pipelining() {
143            fail!((
144                ErrorKind::ResponseError,
145                "This connection does not support pipelining."
146            ));
147        }
148        let value = if self.commands.is_empty() {
149            Value::Array(vec![])
150        } else if self.transaction_mode {
151            self.execute_transaction(con)?
152        } else {
153            self.execute_pipelined(con)?
154        };
155
156        from_owned_redis_value(value.extract_error()?)
157    }
158
159    #[cfg(feature = "aio")]
160    async fn execute_pipelined_async<C>(&self, con: &mut C) -> RedisResult<Value>
161    where
162        C: crate::aio::ConnectionLike,
163    {
164        // As the pipeline is not a transaction, the pipeline's commands got executed as is, and we
165        // can grab the results one by one (offset 0, and count is the commands' length).
166        let value = con
167            .req_packed_commands(self, 0, self.commands.len())
168            .await?;
169        self.make_pipeline_results(value)
170    }
171
172    #[cfg(feature = "aio")]
173    async fn execute_transaction_async<C>(&self, con: &mut C) -> RedisResult<Value>
174    where
175        C: crate::aio::ConnectionLike,
176    {
177        // As the pipeline is a transaction pipeline, a `MULTI` got inserted in the beginning, and
178        // an `EXEC` added at the end. The `MULTI` received an `Ok` response. The individual
179        // commands received `QUEUED` responses. And the proper results of the commands are in the
180        // response to `EXEC` (A Redis Array of the results of the individual commands in the
181        // pipeline). So when getting the result, we skip straight to `EXEC`'s result (offset is
182        // length + 1), and only extract the result of the `EXEC` (count is 1).
183        let mut resp = con
184            .req_packed_commands(self, self.commands.len() + 1, 1)
185            .await?;
186        match resp.pop() {
187            Some(Value::Nil) => Ok(Value::Nil),
188            Some(Value::Array(items)) => self.make_pipeline_results(items),
189            _ => Err((
190                ErrorKind::ResponseError,
191                "Invalid response when parsing multi response",
192            )
193                .into()),
194        }
195    }
196
197    /// Async version of `query`.
198    #[inline]
199    #[cfg(feature = "aio")]
200    pub async fn query_async<T: FromRedisValue>(
201        &self,
202        con: &mut impl crate::aio::ConnectionLike,
203    ) -> RedisResult<T> {
204        let value = if self.commands.is_empty() {
205            return from_owned_redis_value(Value::Array(vec![]));
206        } else if self.transaction_mode {
207            self.execute_transaction_async(con).await?
208        } else {
209            self.execute_pipelined_async(con).await?
210        };
211        from_owned_redis_value(value.extract_error()?)
212    }
213
214    /// This is a shortcut to `query()` that does not return a value and
215    /// will fail the task if the query of the pipeline fails.
216    ///
217    /// This is equivalent to a call of query like this:
218    ///
219    /// ```rust,no_run
220    /// # let client = redis::Client::open("redis://127.0.0.1/").unwrap();
221    /// # let mut con = client.get_connection().unwrap();
222    /// redis::pipe().cmd("PING").query::<()>(&mut con).unwrap();
223    /// ```
224    ///
225    /// NOTE: A Pipeline object may be reused after `query()` with all the commands as were inserted
226    ///       to them. In order to clear a Pipeline object with minimal memory released/allocated,
227    ///       it is necessary to call the `clear()` before inserting new commands.
228    #[inline]
229    #[deprecated(note = "Use Cmd::exec + unwrap, instead")]
230    pub fn execute(&self, con: &mut dyn ConnectionLike) {
231        self.exec(con).unwrap();
232    }
233
234    /// This is an alternative to `query`` that can be used if you want to be able to handle a
235    /// command's success or failure but don't care about the command's response. For example,
236    /// this is useful for "SET" commands for which the response's content is not important.
237    /// It avoids the need to define generic bounds for ().
238    #[inline]
239    pub fn exec(&self, con: &mut dyn ConnectionLike) -> RedisResult<()> {
240        self.query::<()>(con)
241    }
242
243    /// This is an alternative to `query_async` that can be used if you want to be able to handle a
244    /// command's success or failure but don't care about the command's response. For example,
245    /// this is useful for "SET" commands for which the response's content is not important.
246    /// It avoids the need to define generic bounds for ().
247    #[cfg(feature = "aio")]
248    pub async fn exec_async(&self, con: &mut impl crate::aio::ConnectionLike) -> RedisResult<()> {
249        self.query_async::<()>(con).await
250    }
251}
252
253fn encode_pipeline(cmds: &[Cmd], atomic: bool) -> Vec<u8> {
254    let mut rv = vec![];
255    write_pipeline(&mut rv, cmds, atomic);
256    rv
257}
258
259fn write_pipeline(rv: &mut Vec<u8>, cmds: &[Cmd], atomic: bool) {
260    let cmds_len = cmds.iter().map(cmd_len).sum();
261
262    if atomic {
263        let multi = cmd("MULTI");
264        let exec = cmd("EXEC");
265        rv.reserve(cmd_len(&multi) + cmd_len(&exec) + cmds_len);
266
267        multi.write_packed_command_preallocated(rv);
268        for cmd in cmds {
269            cmd.write_packed_command_preallocated(rv);
270        }
271        exec.write_packed_command_preallocated(rv);
272    } else {
273        rv.reserve(cmds_len);
274
275        for cmd in cmds {
276            cmd.write_packed_command_preallocated(rv);
277        }
278    }
279}
280
281// Macro to implement shared methods between Pipeline and ClusterPipeline
282macro_rules! implement_pipeline_commands {
283    ($struct_name:ident) => {
284        impl $struct_name {
285            /// Adds a command to the cluster pipeline.
286            #[inline]
287            pub fn add_command(&mut self, cmd: Cmd) -> &mut Self {
288                self.commands.push(cmd);
289                self
290            }
291
292            /// Starts a new command. Functions such as `arg` then become
293            /// available to add more arguments to that command.
294            #[inline]
295            pub fn cmd(&mut self, name: &str) -> &mut Self {
296                self.add_command(cmd(name))
297            }
298
299            /// Returns an iterator over all the commands currently in this pipeline
300            pub fn cmd_iter(&self) -> impl Iterator<Item = &Cmd> {
301                self.commands.iter()
302            }
303
304            /// Instructs the pipeline to ignore the return value of this command.
305            /// It will still be ensured that it is not an error, but any successful
306            /// result is just thrown away.  This makes result processing through
307            /// tuples much easier because you do not need to handle all the items
308            /// you do not care about.
309            #[inline]
310            pub fn ignore(&mut self) -> &mut Self {
311                match self.commands.len() {
312                    0 => true,
313                    x => self.ignored_commands.insert(x - 1),
314                };
315                self
316            }
317
318            /// Adds an argument to the last started command. This works similar
319            /// to the `arg` method of the `Cmd` object.
320            ///
321            /// Note that this function fails the task if executed on an empty pipeline.
322            #[inline]
323            pub fn arg<T: ToRedisArgs>(&mut self, arg: T) -> &mut Self {
324                {
325                    let cmd = self.get_last_command();
326                    cmd.arg(arg);
327                }
328                self
329            }
330
331            /// Clear a pipeline object's internal data structure.
332            ///
333            /// This allows reusing a pipeline object as a clear object while performing a minimal
334            /// amount of memory released/reallocated.
335            #[inline]
336            pub fn clear(&mut self) {
337                self.commands.clear();
338                self.ignored_commands.clear();
339            }
340
341            #[inline]
342            fn get_last_command(&mut self) -> &mut Cmd {
343                let idx = match self.commands.len() {
344                    0 => panic!("No command on stack"),
345                    x => x - 1,
346                };
347                &mut self.commands[idx]
348            }
349
350            fn make_pipeline_results(&self, resp: Vec<Value>) -> RedisResult<Value> {
351                let resp = Value::extract_error_vec(resp)?;
352
353                let mut rv = Vec::with_capacity(resp.len() - self.ignored_commands.len());
354                for (idx, result) in resp.into_iter().enumerate() {
355                    if !self.ignored_commands.contains(&idx) {
356                        rv.push(result);
357                    }
358                }
359                Ok(Value::Array(rv))
360            }
361        }
362
363        impl Default for $struct_name {
364            fn default() -> Self {
365                Self::new()
366            }
367        }
368    };
369}
370
371implement_pipeline_commands!(Pipeline);
372
373// Defines caching related functions for Pipeline, ClusterPipeline isn't supported yet.
374impl Pipeline {
375    /// Changes caching behaviour for latest command in the pipeline.
376    #[cfg(feature = "cache-aio")]
377    #[cfg_attr(docsrs, doc(cfg(feature = "cache-aio")))]
378    pub fn set_cache_config(&mut self, command_cache_config: CommandCacheConfig) -> &mut Self {
379        let cmd = self.get_last_command();
380        cmd.set_cache_config(command_cache_config);
381        self
382    }
383}