redis/pipeline.rs
1#![macro_use]
2
3#[cfg(feature = "cache-aio")]
4use crate::cmd::CommandCacheConfig;
5use crate::cmd::{cmd, cmd_len, Cmd};
6use crate::connection::ConnectionLike;
7use crate::types::{
8 from_owned_redis_value, ErrorKind, FromRedisValue, HashSet, RedisResult, ToRedisArgs, Value,
9};
10
11/// Represents a redis command pipeline.
12#[derive(Clone)]
13pub struct Pipeline {
14 pub(crate) commands: Vec<Cmd>,
15 pub(crate) transaction_mode: bool,
16 pub(crate) ignored_commands: HashSet<usize>,
17}
18
19/// A pipeline allows you to send multiple commands in one go to the
20/// redis server. API wise it's very similar to just using a command
21/// but it allows multiple commands to be chained and some features such
22/// as iteration are not available.
23///
24/// Basic example:
25///
26/// ```rust,no_run
27/// # let client = redis::Client::open("redis://127.0.0.1/").unwrap();
28/// # let mut con = client.get_connection().unwrap();
29/// let ((k1, k2),) : ((i32, i32),) = redis::pipe()
30/// .cmd("SET").arg("key_1").arg(42).ignore()
31/// .cmd("SET").arg("key_2").arg(43).ignore()
32/// .cmd("MGET").arg(&["key_1", "key_2"]).query(&mut con).unwrap();
33/// ```
34///
35/// As you can see with `cmd` you can start a new command. By default
36/// each command produces a value but for some you can ignore them by
37/// calling `ignore` on the command. That way it will be skipped in the
38/// return value which is useful for `SET` commands and others, which
39/// do not have a useful return value.
40impl Pipeline {
41 /// Creates an empty pipeline. For consistency with the `cmd`
42 /// api a `pipe` function is provided as alias.
43 pub fn new() -> Pipeline {
44 Self::with_capacity(0)
45 }
46
47 /// Creates an empty pipeline with pre-allocated capacity.
48 pub fn with_capacity(capacity: usize) -> Pipeline {
49 Pipeline {
50 commands: Vec::with_capacity(capacity),
51 transaction_mode: false,
52 ignored_commands: HashSet::new(),
53 }
54 }
55
56 /// This enables atomic mode. In atomic mode the whole pipeline is
57 /// enclosed in `MULTI`/`EXEC`. From the user's point of view nothing
58 /// changes however. This is easier than using `MULTI`/`EXEC` yourself
59 /// as the format does not change.
60 ///
61 /// ```rust,no_run
62 /// # let client = redis::Client::open("redis://127.0.0.1/").unwrap();
63 /// # let mut con = client.get_connection().unwrap();
64 /// let (k1, k2) : (i32, i32) = redis::pipe()
65 /// .atomic()
66 /// .cmd("GET").arg("key_1")
67 /// .cmd("GET").arg("key_2").query(&mut con).unwrap();
68 /// ```
69 #[inline]
70 pub fn atomic(&mut self) -> &mut Pipeline {
71 self.transaction_mode = true;
72 self
73 }
74
75 #[cfg(feature = "aio")]
76 pub(crate) fn is_transaction(&self) -> bool {
77 self.transaction_mode
78 }
79
80 /// Returns the encoded pipeline commands.
81 pub fn get_packed_pipeline(&self) -> Vec<u8> {
82 encode_pipeline(&self.commands, self.transaction_mode)
83 }
84
85 #[cfg(feature = "aio")]
86 pub(crate) fn write_packed_pipeline(&self, out: &mut Vec<u8>) {
87 write_pipeline(out, &self.commands, self.transaction_mode)
88 }
89
90 /// Returns the number of commands currently queued by the usr in the pipeline.
91 ///
92 /// Depending on its configuration (e.g. `atomic`), the pipeline may send more commands to the server than the returned length
93 pub fn len(&self) -> usize {
94 self.commands.len()
95 }
96
97 /// Returns `true` is the pipeline contains no elements.
98 pub fn is_empty(&self) -> bool {
99 self.commands.is_empty()
100 }
101
102 fn execute_pipelined(&self, con: &mut dyn ConnectionLike) -> RedisResult<Value> {
103 self.make_pipeline_results(con.req_packed_commands(
104 &encode_pipeline(&self.commands, false),
105 0,
106 self.commands.len(),
107 )?)
108 }
109
110 fn execute_transaction(&self, con: &mut dyn ConnectionLike) -> RedisResult<Value> {
111 let mut resp = con.req_packed_commands(
112 &encode_pipeline(&self.commands, true),
113 self.commands.len() + 1,
114 1,
115 )?;
116
117 match resp.pop() {
118 Some(Value::Nil) => Ok(Value::Nil),
119 Some(Value::Array(items)) => self.make_pipeline_results(items),
120 _ => fail!((
121 ErrorKind::ResponseError,
122 "Invalid response when parsing multi response"
123 )),
124 }
125 }
126
127 /// Executes the pipeline and fetches the return values. Since most
128 /// pipelines return different types it's recommended to use tuple
129 /// matching to process the results:
130 ///
131 /// ```rust,no_run
132 /// # let client = redis::Client::open("redis://127.0.0.1/").unwrap();
133 /// # let mut con = client.get_connection().unwrap();
134 /// let (k1, k2) : (i32, i32) = redis::pipe()
135 /// .cmd("SET").arg("key_1").arg(42).ignore()
136 /// .cmd("SET").arg("key_2").arg(43).ignore()
137 /// .cmd("GET").arg("key_1")
138 /// .cmd("GET").arg("key_2").query(&mut con).unwrap();
139 /// ```
140 ///
141 /// NOTE: A Pipeline object may be reused after `query()` with all the commands as were inserted
142 /// to them. In order to clear a Pipeline object with minimal memory released/allocated,
143 /// it is necessary to call the `clear()` before inserting new commands.
144 #[inline]
145 pub fn query<T: FromRedisValue>(&self, con: &mut dyn ConnectionLike) -> RedisResult<T> {
146 if !con.supports_pipelining() {
147 fail!((
148 ErrorKind::ResponseError,
149 "This connection does not support pipelining."
150 ));
151 }
152 let value = if self.commands.is_empty() {
153 Value::Array(vec![])
154 } else if self.transaction_mode {
155 self.execute_transaction(con)?
156 } else {
157 self.execute_pipelined(con)?
158 };
159
160 from_owned_redis_value(value.extract_error()?)
161 }
162
163 #[cfg(feature = "aio")]
164 async fn execute_pipelined_async<C>(&self, con: &mut C) -> RedisResult<Value>
165 where
166 C: crate::aio::ConnectionLike,
167 {
168 let value = con
169 .req_packed_commands(self, 0, self.commands.len())
170 .await?;
171 self.make_pipeline_results(value)
172 }
173
174 #[cfg(feature = "aio")]
175 async fn execute_transaction_async<C>(&self, con: &mut C) -> RedisResult<Value>
176 where
177 C: crate::aio::ConnectionLike,
178 {
179 let mut resp = con
180 .req_packed_commands(self, self.commands.len() + 1, 1)
181 .await?;
182 match resp.pop() {
183 Some(Value::Nil) => Ok(Value::Nil),
184 Some(Value::Array(items)) => self.make_pipeline_results(items),
185 _ => Err((
186 ErrorKind::ResponseError,
187 "Invalid response when parsing multi response",
188 )
189 .into()),
190 }
191 }
192
193 /// Async version of `query`.
194 #[inline]
195 #[cfg(feature = "aio")]
196 pub async fn query_async<T: FromRedisValue>(
197 &self,
198 con: &mut impl crate::aio::ConnectionLike,
199 ) -> RedisResult<T> {
200 let value = if self.commands.is_empty() {
201 return from_owned_redis_value(Value::Array(vec![]));
202 } else if self.transaction_mode {
203 self.execute_transaction_async(con).await?
204 } else {
205 self.execute_pipelined_async(con).await?
206 };
207 from_owned_redis_value(value.extract_error()?)
208 }
209
210 /// This is a shortcut to `query()` that does not return a value and
211 /// will fail the task if the query of the pipeline fails.
212 ///
213 /// This is equivalent to a call of query like this:
214 ///
215 /// ```rust,no_run
216 /// # let client = redis::Client::open("redis://127.0.0.1/").unwrap();
217 /// # let mut con = client.get_connection().unwrap();
218 /// redis::pipe().cmd("PING").query::<()>(&mut con).unwrap();
219 /// ```
220 ///
221 /// NOTE: A Pipeline object may be reused after `query()` with all the commands as were inserted
222 /// to them. In order to clear a Pipeline object with minimal memory released/allocated,
223 /// it is necessary to call the `clear()` before inserting new commands.
224 #[inline]
225 #[deprecated(note = "Use Cmd::exec + unwrap, instead")]
226 pub fn execute(&self, con: &mut dyn ConnectionLike) {
227 self.exec(con).unwrap();
228 }
229
230 /// This is an alternative to `query`` that can be used if you want to be able to handle a
231 /// command's success or failure but don't care about the command's response. For example,
232 /// this is useful for "SET" commands for which the response's content is not important.
233 /// It avoids the need to define generic bounds for ().
234 #[inline]
235 pub fn exec(&self, con: &mut dyn ConnectionLike) -> RedisResult<()> {
236 self.query::<()>(con)
237 }
238
239 /// This is an alternative to `query_async` that can be used if you want to be able to handle a
240 /// command's success or failure but don't care about the command's response. For example,
241 /// this is useful for "SET" commands for which the response's content is not important.
242 /// It avoids the need to define generic bounds for ().
243 #[cfg(feature = "aio")]
244 pub async fn exec_async(&self, con: &mut impl crate::aio::ConnectionLike) -> RedisResult<()> {
245 self.query_async::<()>(con).await
246 }
247}
248
249fn encode_pipeline(cmds: &[Cmd], atomic: bool) -> Vec<u8> {
250 let mut rv = vec![];
251 write_pipeline(&mut rv, cmds, atomic);
252 rv
253}
254
255fn write_pipeline(rv: &mut Vec<u8>, cmds: &[Cmd], atomic: bool) {
256 let cmds_len = cmds.iter().map(cmd_len).sum();
257
258 if atomic {
259 let multi = cmd("MULTI");
260 let exec = cmd("EXEC");
261 rv.reserve(cmd_len(&multi) + cmd_len(&exec) + cmds_len);
262
263 multi.write_packed_command_preallocated(rv);
264 for cmd in cmds {
265 cmd.write_packed_command_preallocated(rv);
266 }
267 exec.write_packed_command_preallocated(rv);
268 } else {
269 rv.reserve(cmds_len);
270
271 for cmd in cmds {
272 cmd.write_packed_command_preallocated(rv);
273 }
274 }
275}
276
277// Macro to implement shared methods between Pipeline and ClusterPipeline
278macro_rules! implement_pipeline_commands {
279 ($struct_name:ident) => {
280 impl $struct_name {
281 /// Adds a command to the cluster pipeline.
282 #[inline]
283 pub fn add_command(&mut self, cmd: Cmd) -> &mut Self {
284 self.commands.push(cmd);
285 self
286 }
287
288 /// Starts a new command. Functions such as `arg` then become
289 /// available to add more arguments to that command.
290 #[inline]
291 pub fn cmd(&mut self, name: &str) -> &mut Self {
292 self.add_command(cmd(name))
293 }
294
295 /// Returns an iterator over all the commands currently in this pipeline
296 pub fn cmd_iter(&self) -> impl Iterator<Item = &Cmd> {
297 self.commands.iter()
298 }
299
300 /// Instructs the pipeline to ignore the return value of this command.
301 /// It will still be ensured that it is not an error, but any successful
302 /// result is just thrown away. This makes result processing through
303 /// tuples much easier because you do not need to handle all the items
304 /// you do not care about.
305 #[inline]
306 pub fn ignore(&mut self) -> &mut Self {
307 match self.commands.len() {
308 0 => true,
309 x => self.ignored_commands.insert(x - 1),
310 };
311 self
312 }
313
314 /// Adds an argument to the last started command. This works similar
315 /// to the `arg` method of the `Cmd` object.
316 ///
317 /// Note that this function fails the task if executed on an empty pipeline.
318 #[inline]
319 pub fn arg<T: ToRedisArgs>(&mut self, arg: T) -> &mut Self {
320 {
321 let cmd = self.get_last_command();
322 cmd.arg(arg);
323 }
324 self
325 }
326
327 /// Clear a pipeline object's internal data structure.
328 ///
329 /// This allows reusing a pipeline object as a clear object while performing a minimal
330 /// amount of memory released/reallocated.
331 #[inline]
332 pub fn clear(&mut self) {
333 self.commands.clear();
334 self.ignored_commands.clear();
335 }
336
337 #[inline]
338 fn get_last_command(&mut self) -> &mut Cmd {
339 let idx = match self.commands.len() {
340 0 => panic!("No command on stack"),
341 x => x - 1,
342 };
343 &mut self.commands[idx]
344 }
345
346 fn make_pipeline_results(&self, resp: Vec<Value>) -> RedisResult<Value> {
347 let resp = Value::extract_error_vec(resp)?;
348
349 let mut rv = Vec::with_capacity(resp.len() - self.ignored_commands.len());
350 for (idx, result) in resp.into_iter().enumerate() {
351 if !self.ignored_commands.contains(&idx) {
352 rv.push(result);
353 }
354 }
355 Ok(Value::Array(rv))
356 }
357 }
358
359 impl Default for $struct_name {
360 fn default() -> Self {
361 Self::new()
362 }
363 }
364 };
365}
366
367implement_pipeline_commands!(Pipeline);
368
369// Defines caching related functions for Pipeline, ClusterPipeline isn't supported yet.
370impl Pipeline {
371 /// Changes caching behaviour for latest command in the pipeline.
372 #[cfg(feature = "cache-aio")]
373 #[cfg_attr(docsrs, doc(cfg(feature = "cache-aio")))]
374 pub fn set_cache_config(&mut self, command_cache_config: CommandCacheConfig) -> &mut Self {
375 let cmd = self.get_last_command();
376 cmd.set_cache_config(command_cache_config);
377 self
378 }
379}