redis/pipeline.rs
1#![macro_use]
2
3#[cfg(feature = "cache-aio")]
4use crate::cmd::CommandCacheConfig;
5use crate::cmd::{cmd, cmd_len, Cmd};
6use crate::connection::ConnectionLike;
7use crate::types::{
8 from_owned_redis_value, ErrorKind, FromRedisValue, HashSet, RedisResult, ToRedisArgs, Value,
9};
10
11/// Represents a redis command pipeline.
12#[derive(Clone)]
13pub struct Pipeline {
14 pub(crate) commands: Vec<Cmd>,
15 pub(crate) transaction_mode: bool,
16 pub(crate) ignored_commands: HashSet<usize>,
17}
18
19/// A pipeline allows you to send multiple commands in one go to the
20/// redis server. API wise it's very similar to just using a command
21/// but it allows multiple commands to be chained and some features such
22/// as iteration are not available.
23///
24/// Basic example:
25///
26/// ```rust,no_run
27/// # let client = redis::Client::open("redis://127.0.0.1/").unwrap();
28/// # let mut con = client.get_connection().unwrap();
29/// let ((k1, k2),) : ((i32, i32),) = redis::pipe()
30/// .cmd("SET").arg("key_1").arg(42).ignore()
31/// .cmd("SET").arg("key_2").arg(43).ignore()
32/// .cmd("MGET").arg(&["key_1", "key_2"]).query(&mut con).unwrap();
33/// ```
34///
35/// As you can see with `cmd` you can start a new command. By default
36/// each command produces a value but for some you can ignore them by
37/// calling `ignore` on the command. That way it will be skipped in the
38/// return value which is useful for `SET` commands and others, which
39/// do not have a useful return value.
40impl Pipeline {
41 /// Creates an empty pipeline. For consistency with the `cmd`
42 /// api a `pipe` function is provided as alias.
43 pub fn new() -> Pipeline {
44 Self::with_capacity(0)
45 }
46
47 /// Creates an empty pipeline with pre-allocated capacity.
48 pub fn with_capacity(capacity: usize) -> Pipeline {
49 Pipeline {
50 commands: Vec::with_capacity(capacity),
51 transaction_mode: false,
52 ignored_commands: HashSet::new(),
53 }
54 }
55
56 /// This enables atomic mode. In atomic mode the whole pipeline is
57 /// enclosed in `MULTI`/`EXEC`. From the user's point of view nothing
58 /// changes however. This is easier than using `MULTI`/`EXEC` yourself
59 /// as the format does not change.
60 ///
61 /// ```rust,no_run
62 /// # let client = redis::Client::open("redis://127.0.0.1/").unwrap();
63 /// # let mut con = client.get_connection().unwrap();
64 /// let (k1, k2) : (i32, i32) = redis::pipe()
65 /// .atomic()
66 /// .cmd("GET").arg("key_1")
67 /// .cmd("GET").arg("key_2").query(&mut con).unwrap();
68 /// ```
69 #[inline]
70 pub fn atomic(&mut self) -> &mut Pipeline {
71 self.transaction_mode = true;
72 self
73 }
74
75 /// Returns `true` if the pipeline is in transaction mode (aka atomic mode).
76 #[cfg(feature = "aio")]
77 pub fn is_transaction(&self) -> bool {
78 self.transaction_mode
79 }
80
81 /// Returns the encoded pipeline commands.
82 pub fn get_packed_pipeline(&self) -> Vec<u8> {
83 encode_pipeline(&self.commands, self.transaction_mode)
84 }
85
86 /// Returns the number of commands currently queued by the usr in the pipeline.
87 ///
88 /// Depending on its configuration (e.g. `atomic`), the pipeline may send more commands to the server than the returned length
89 pub fn len(&self) -> usize {
90 self.commands.len()
91 }
92
93 /// Returns `true` is the pipeline contains no elements.
94 pub fn is_empty(&self) -> bool {
95 self.commands.is_empty()
96 }
97
98 fn execute_pipelined(&self, con: &mut dyn ConnectionLike) -> RedisResult<Value> {
99 self.make_pipeline_results(con.req_packed_commands(
100 &encode_pipeline(&self.commands, false),
101 0,
102 self.commands.len(),
103 )?)
104 }
105
106 fn execute_transaction(&self, con: &mut dyn ConnectionLike) -> RedisResult<Value> {
107 let mut resp = con.req_packed_commands(
108 &encode_pipeline(&self.commands, true),
109 self.commands.len() + 1,
110 1,
111 )?;
112
113 match resp.pop() {
114 Some(Value::Nil) => Ok(Value::Nil),
115 Some(Value::Array(items)) => self.make_pipeline_results(items),
116 _ => fail!((
117 ErrorKind::ResponseError,
118 "Invalid response when parsing multi response"
119 )),
120 }
121 }
122
123 /// Executes the pipeline and fetches the return values. Since most
124 /// pipelines return different types it's recommended to use tuple
125 /// matching to process the results:
126 ///
127 /// ```rust,no_run
128 /// # let client = redis::Client::open("redis://127.0.0.1/").unwrap();
129 /// # let mut con = client.get_connection().unwrap();
130 /// let (k1, k2) : (i32, i32) = redis::pipe()
131 /// .cmd("SET").arg("key_1").arg(42).ignore()
132 /// .cmd("SET").arg("key_2").arg(43).ignore()
133 /// .cmd("GET").arg("key_1")
134 /// .cmd("GET").arg("key_2").query(&mut con).unwrap();
135 /// ```
136 ///
137 /// NOTE: A Pipeline object may be reused after `query()` with all the commands as were inserted
138 /// to them. In order to clear a Pipeline object with minimal memory released/allocated,
139 /// it is necessary to call the `clear()` before inserting new commands.
140 #[inline]
141 pub fn query<T: FromRedisValue>(&self, con: &mut dyn ConnectionLike) -> RedisResult<T> {
142 if !con.supports_pipelining() {
143 fail!((
144 ErrorKind::ResponseError,
145 "This connection does not support pipelining."
146 ));
147 }
148 let value = if self.commands.is_empty() {
149 Value::Array(vec![])
150 } else if self.transaction_mode {
151 self.execute_transaction(con)?
152 } else {
153 self.execute_pipelined(con)?
154 };
155
156 from_owned_redis_value(value.extract_error()?)
157 }
158
159 #[cfg(feature = "aio")]
160 async fn execute_pipelined_async<C>(&self, con: &mut C) -> RedisResult<Value>
161 where
162 C: crate::aio::ConnectionLike,
163 {
164 // As the pipeline is not a transaction, the pipeline's commands got executed as is, and we
165 // can grab the results one by one (offset 0, and count is the commands' length).
166 let value = con
167 .req_packed_commands(self, 0, self.commands.len())
168 .await?;
169 self.make_pipeline_results(value)
170 }
171
172 #[cfg(feature = "aio")]
173 async fn execute_transaction_async<C>(&self, con: &mut C) -> RedisResult<Value>
174 where
175 C: crate::aio::ConnectionLike,
176 {
177 // As the pipeline is a transaction pipeline, a `MULTI` got inserted in the beginning, and
178 // an `EXEC` added at the end. The `MULTI` received an `Ok` response. The individual
179 // commands received `QUEUED` responses. And the proper results of the commands are in the
180 // response to `EXEC` (A Redis Array of the results of the individual commands in the
181 // pipeline). So when getting the result, we skip straight to `EXEC`'s result (offset is
182 // length + 1), and only extract the result of the `EXEC` (count is 1).
183 let mut resp = con
184 .req_packed_commands(self, self.commands.len() + 1, 1)
185 .await?;
186 match resp.pop() {
187 Some(Value::Nil) => Ok(Value::Nil),
188 Some(Value::Array(items)) => self.make_pipeline_results(items),
189 _ => Err((
190 ErrorKind::ResponseError,
191 "Invalid response when parsing multi response",
192 )
193 .into()),
194 }
195 }
196
197 /// Async version of `query`.
198 #[inline]
199 #[cfg(feature = "aio")]
200 pub async fn query_async<T: FromRedisValue>(
201 &self,
202 con: &mut impl crate::aio::ConnectionLike,
203 ) -> RedisResult<T> {
204 let value = if self.commands.is_empty() {
205 return from_owned_redis_value(Value::Array(vec![]));
206 } else if self.transaction_mode {
207 self.execute_transaction_async(con).await?
208 } else {
209 self.execute_pipelined_async(con).await?
210 };
211 from_owned_redis_value(value.extract_error()?)
212 }
213
214 /// This is a shortcut to `query()` that does not return a value and
215 /// will fail the task if the query of the pipeline fails.
216 ///
217 /// This is equivalent to a call of query like this:
218 ///
219 /// ```rust,no_run
220 /// # let client = redis::Client::open("redis://127.0.0.1/").unwrap();
221 /// # let mut con = client.get_connection().unwrap();
222 /// redis::pipe().cmd("PING").query::<()>(&mut con).unwrap();
223 /// ```
224 ///
225 /// NOTE: A Pipeline object may be reused after `query()` with all the commands as were inserted
226 /// to them. In order to clear a Pipeline object with minimal memory released/allocated,
227 /// it is necessary to call the `clear()` before inserting new commands.
228 #[inline]
229 #[deprecated(note = "Use Cmd::exec + unwrap, instead")]
230 pub fn execute(&self, con: &mut dyn ConnectionLike) {
231 self.exec(con).unwrap();
232 }
233
234 /// This is an alternative to `query`` that can be used if you want to be able to handle a
235 /// command's success or failure but don't care about the command's response. For example,
236 /// this is useful for "SET" commands for which the response's content is not important.
237 /// It avoids the need to define generic bounds for ().
238 #[inline]
239 pub fn exec(&self, con: &mut dyn ConnectionLike) -> RedisResult<()> {
240 self.query::<()>(con)
241 }
242
243 /// This is an alternative to `query_async` that can be used if you want to be able to handle a
244 /// command's success or failure but don't care about the command's response. For example,
245 /// this is useful for "SET" commands for which the response's content is not important.
246 /// It avoids the need to define generic bounds for ().
247 #[cfg(feature = "aio")]
248 pub async fn exec_async(&self, con: &mut impl crate::aio::ConnectionLike) -> RedisResult<()> {
249 self.query_async::<()>(con).await
250 }
251}
252
253fn encode_pipeline(cmds: &[Cmd], atomic: bool) -> Vec<u8> {
254 let mut rv = vec![];
255 write_pipeline(&mut rv, cmds, atomic);
256 rv
257}
258
259fn write_pipeline(rv: &mut Vec<u8>, cmds: &[Cmd], atomic: bool) {
260 let cmds_len = cmds.iter().map(cmd_len).sum();
261
262 if atomic {
263 let multi = cmd("MULTI");
264 let exec = cmd("EXEC");
265 rv.reserve(cmd_len(&multi) + cmd_len(&exec) + cmds_len);
266
267 multi.write_packed_command_preallocated(rv);
268 for cmd in cmds {
269 cmd.write_packed_command_preallocated(rv);
270 }
271 exec.write_packed_command_preallocated(rv);
272 } else {
273 rv.reserve(cmds_len);
274
275 for cmd in cmds {
276 cmd.write_packed_command_preallocated(rv);
277 }
278 }
279}
280
281// Macro to implement shared methods between Pipeline and ClusterPipeline
282macro_rules! implement_pipeline_commands {
283 ($struct_name:ident) => {
284 impl $struct_name {
285 /// Adds a command to the cluster pipeline.
286 #[inline]
287 pub fn add_command(&mut self, cmd: Cmd) -> &mut Self {
288 self.commands.push(cmd);
289 self
290 }
291
292 /// Starts a new command. Functions such as `arg` then become
293 /// available to add more arguments to that command.
294 #[inline]
295 pub fn cmd(&mut self, name: &str) -> &mut Self {
296 self.add_command(cmd(name))
297 }
298
299 /// Returns an iterator over all the commands currently in this pipeline
300 pub fn cmd_iter(&self) -> impl Iterator<Item = &Cmd> {
301 self.commands.iter()
302 }
303
304 /// Instructs the pipeline to ignore the return value of this command.
305 /// It will still be ensured that it is not an error, but any successful
306 /// result is just thrown away. This makes result processing through
307 /// tuples much easier because you do not need to handle all the items
308 /// you do not care about.
309 #[inline]
310 pub fn ignore(&mut self) -> &mut Self {
311 match self.commands.len() {
312 0 => true,
313 x => self.ignored_commands.insert(x - 1),
314 };
315 self
316 }
317
318 /// Adds an argument to the last started command. This works similar
319 /// to the `arg` method of the `Cmd` object.
320 ///
321 /// Note that this function fails the task if executed on an empty pipeline.
322 #[inline]
323 pub fn arg<T: ToRedisArgs>(&mut self, arg: T) -> &mut Self {
324 {
325 let cmd = self.get_last_command();
326 cmd.arg(arg);
327 }
328 self
329 }
330
331 /// Clear a pipeline object's internal data structure.
332 ///
333 /// This allows reusing a pipeline object as a clear object while performing a minimal
334 /// amount of memory released/reallocated.
335 #[inline]
336 pub fn clear(&mut self) {
337 self.commands.clear();
338 self.ignored_commands.clear();
339 }
340
341 #[inline]
342 fn get_last_command(&mut self) -> &mut Cmd {
343 let idx = match self.commands.len() {
344 0 => panic!("No command on stack"),
345 x => x - 1,
346 };
347 &mut self.commands[idx]
348 }
349
350 fn make_pipeline_results(&self, resp: Vec<Value>) -> RedisResult<Value> {
351 let resp = Value::extract_error_vec(resp)?;
352
353 let mut rv = Vec::with_capacity(resp.len() - self.ignored_commands.len());
354 for (idx, result) in resp.into_iter().enumerate() {
355 if !self.ignored_commands.contains(&idx) {
356 rv.push(result);
357 }
358 }
359 Ok(Value::Array(rv))
360 }
361 }
362
363 impl Default for $struct_name {
364 fn default() -> Self {
365 Self::new()
366 }
367 }
368 };
369}
370
371implement_pipeline_commands!(Pipeline);
372
373// Defines caching related functions for Pipeline, ClusterPipeline isn't supported yet.
374impl Pipeline {
375 /// Changes caching behaviour for latest command in the pipeline.
376 #[cfg(feature = "cache-aio")]
377 #[cfg_attr(docsrs, doc(cfg(feature = "cache-aio")))]
378 pub fn set_cache_config(&mut self, command_cache_config: CommandCacheConfig) -> &mut Self {
379 let cmd = self.get_last_command();
380 cmd.set_cache_config(command_cache_config);
381 self
382 }
383}