redis/aio/connection.rs
1use super::AsyncDNSResolver;
2use super::RedisRuntime;
3
4use crate::connection::{ConnectionAddr, ConnectionInfo};
5#[cfg(feature = "aio")]
6use crate::types::RedisResult;
7
8use super::ConnectionLike;
9use crate::cmd::{cmd, pipe};
10use crate::pipeline::Pipeline;
11use crate::{FromRedisValue, RedisError, ToRedisArgs};
12use futures_util::future::select_ok;
13use std::future::Future;
14
15pub(crate) async fn connect_simple<T: RedisRuntime>(
16 connection_info: &ConnectionInfo,
17 dns_resolver: &dyn AsyncDNSResolver,
18) -> RedisResult<T> {
19 Ok(match connection_info.addr {
20 ConnectionAddr::Tcp(ref host, port) => {
21 let socket_addrs = dns_resolver.resolve(host, port).await?;
22 select_ok(
23 socket_addrs
24 .map(|addr| Box::pin(<T>::connect_tcp(addr, &connection_info.tcp_settings))),
25 )
26 .await?
27 .0
28 }
29
30 #[cfg(any(feature = "tls-native-tls", feature = "tls-rustls"))]
31 ConnectionAddr::TcpTls {
32 ref host,
33 port,
34 insecure,
35 ref tls_params,
36 } => {
37 let socket_addrs = dns_resolver.resolve(host, port).await?;
38 select_ok(socket_addrs.map(|socket_addr| {
39 Box::pin(<T>::connect_tcp_tls(
40 host,
41 socket_addr,
42 insecure,
43 tls_params,
44 &connection_info.tcp_settings,
45 ))
46 }))
47 .await?
48 .0
49 }
50
51 #[cfg(not(any(feature = "tls-native-tls", feature = "tls-rustls")))]
52 ConnectionAddr::TcpTls { .. } => {
53 fail!((
54 crate::errors::ErrorKind::InvalidClientConfig,
55 "Cannot connect to TCP with TLS without the tls feature"
56 ));
57 }
58
59 #[cfg(unix)]
60 ConnectionAddr::Unix(ref path) => <T>::connect_unix(path).await?,
61
62 #[cfg(not(unix))]
63 ConnectionAddr::Unix(_) => {
64 fail!((
65 crate::errors::ErrorKind::InvalidClientConfig,
66 "Cannot connect to unix sockets \
67 on this platform",
68 ))
69 }
70 })
71}
72
73/// Executes a Redis transaction asynchronously by automatically watching keys and running
74/// a transaction loop until it succeeds. Similar to the synchronous [`transaction`](crate::transaction)
75/// function but for async execution.
76///
77/// The provided closure may be executed multiple times if the transaction fails due to
78/// watched keys being modified between WATCH and EXEC. Any side effects in the closure
79/// should account for possible multiple executions. The closure should return `Ok(None)` to indicate a transaction failure and to
80/// retry (this will happen automatically if the last call in the closure is to run the transaction), or `Err(err)` to abort the
81/// transaction with an error. A successful transaction should return `Ok(Some(value))` with the desired result from the EXEC command.
82///
83/// # Examples
84///
85/// ```rust,no_run
86/// use redis::{AsyncCommands, RedisResult, pipe};
87///
88/// async fn increment(con: redis::aio::MultiplexedConnection) -> RedisResult<isize> {
89/// let key = "my_counter";
90/// redis::aio::transaction_async(con, &[key], |mut con, mut pipe| async move {
91/// // Read the current value first
92/// let val: isize = con.get(key).await?;
93/// // Build the pipeline and execute it atomically (MULTI/EXEC are added automatically)
94/// pipe.set(key, val + 1)
95/// .ignore()
96/// .get(key)
97/// .query_async(&mut con)
98/// .await
99/// })
100/// .await
101/// }
102/// ```
103///
104/// # Notes
105///
106/// - The closure may be executed multiple times if watched keys are modified by other
107/// clients between `WATCH` and `EXEC`; its side effects must be idempotent.
108/// - A successful `EXEC` automatically discards all `WATCH`es, so no explicit `UNWATCH`
109/// is needed on the success path.
110/// - The transaction is automatically abandoned if the closure returns an error; an
111/// explicit `UNWATCH` is sent in that case to leave the connection in a clean state.
112///
113/// ## Warning: Concurrent Transactions on Multiplexed Connections
114///
115/// When using a multiplexed connection (e.g. async connection types in this crate),
116/// cloning shares the underlying channel. Running concurrent transactions on clones of
117/// the same multiplexed connection could lead to unexpected behavior: the
118/// `WATCH`/`MULTI`/`EXEC` sequence from one transaction may interleave with commands from
119/// another. Ensure at most one transaction is active on a given multiplexed
120/// connection at a time.
121///
122/// ## Warning: Transactions on cluster connections
123///
124/// A cluster connection is a collection of multiple underlying connections to different
125/// cluster nodes. Running a transaction on a cluster connection is only safe if all the
126/// keys being watched and modified in the transaction are guaranteed to be on the same
127/// cluster node, since Redis transactions cannot span multiple nodes. It is the caller's
128/// responsibility to ensure this condition is met when using `transaction_async` with a
129/// cluster connection.
130///
131/// For more details on Redis transactions, see the [Redis documentation](https://redis.io/topics/transactions)
132pub async fn transaction_async<
133 C: ConnectionLike + Clone,
134 K: ToRedisArgs,
135 T: FromRedisValue,
136 F: FnMut(C, Pipeline) -> Fut,
137 Fut: Future<Output = Result<Option<T>, RedisError>>,
138>(
139 mut connection: C,
140 keys: &[K],
141 mut func: F,
142) -> Result<T, RedisError> {
143 if keys.is_empty() {
144 fail!((
145 crate::errors::ErrorKind::InvalidClientConfig,
146 "At least one key must be provided to watch for transactions"
147 ));
148 }
149 loop {
150 cmd("WATCH").arg(keys).exec_async(&mut connection).await?;
151
152 let mut pipeline = pipe();
153 pipeline.atomic();
154 let response = func(connection.clone(), pipeline).await;
155 // Send UNWATCH as a best-effort safety net for any edge cases where EXEC
156 // was not reached (e.g. the closure returned None before calling query_async).
157 let _ = cmd("UNWATCH").exec_async(&mut connection).await;
158 if let Some(result) = response? {
159 return Ok(result);
160 }
161 }
162}
163
164#[cfg(test)]
165mod tests {
166 #[cfg(feature = "cluster-async")]
167 use crate::cluster_async;
168
169 use super::super::*;
170
171 #[test]
172 fn test_is_sync() {
173 const fn assert_sync<T: Sync>() {}
174
175 assert_sync::<MultiplexedConnection>();
176 assert_sync::<PubSub>();
177 assert_sync::<Monitor>();
178 #[cfg(feature = "connection-manager")]
179 assert_sync::<ConnectionManager>();
180 #[cfg(feature = "cluster-async")]
181 assert_sync::<cluster_async::ClusterConnection>();
182 }
183
184 #[test]
185 fn test_is_send() {
186 const fn assert_send<T: Send>() {}
187
188 assert_send::<MultiplexedConnection>();
189 assert_send::<PubSub>();
190 assert_send::<Monitor>();
191 #[cfg(feature = "connection-manager")]
192 assert_send::<ConnectionManager>();
193 #[cfg(feature = "cluster-async")]
194 assert_send::<cluster_async::ClusterConnection>();
195 }
196}