Skip to main content

redis/aio/
connection.rs

1use super::AsyncDNSResolver;
2use super::RedisRuntime;
3
4use crate::connection::{ConnectionAddr, ConnectionInfo};
5#[cfg(feature = "aio")]
6use crate::types::RedisResult;
7
8use super::ConnectionLike;
9use crate::cmd::{cmd, pipe};
10use crate::pipeline::Pipeline;
11use crate::{FromRedisValue, RedisError, ToRedisArgs};
12use futures_util::future::select_ok;
13use std::future::Future;
14
15pub(crate) async fn connect_simple<T: RedisRuntime>(
16    connection_info: &ConnectionInfo,
17    dns_resolver: &dyn AsyncDNSResolver,
18) -> RedisResult<T> {
19    Ok(match connection_info.addr {
20        ConnectionAddr::Tcp(ref host, port) => {
21            let socket_addrs = dns_resolver.resolve(host, port).await?;
22            select_ok(
23                socket_addrs
24                    .map(|addr| Box::pin(<T>::connect_tcp(addr, &connection_info.tcp_settings))),
25            )
26            .await?
27            .0
28        }
29
30        #[cfg(any(feature = "tls-native-tls", feature = "tls-rustls"))]
31        ConnectionAddr::TcpTls {
32            ref host,
33            port,
34            insecure,
35            ref tls_params,
36        } => {
37            let socket_addrs = dns_resolver.resolve(host, port).await?;
38            select_ok(socket_addrs.map(|socket_addr| {
39                Box::pin(<T>::connect_tcp_tls(
40                    host,
41                    socket_addr,
42                    insecure,
43                    tls_params,
44                    &connection_info.tcp_settings,
45                ))
46            }))
47            .await?
48            .0
49        }
50
51        #[cfg(not(any(feature = "tls-native-tls", feature = "tls-rustls")))]
52        ConnectionAddr::TcpTls { .. } => {
53            fail!((
54                crate::errors::ErrorKind::InvalidClientConfig,
55                "Cannot connect to TCP with TLS without the tls feature"
56            ));
57        }
58
59        #[cfg(unix)]
60        ConnectionAddr::Unix(ref path) => <T>::connect_unix(path).await?,
61
62        #[cfg(not(unix))]
63        ConnectionAddr::Unix(_) => {
64            fail!((
65                crate::errors::ErrorKind::InvalidClientConfig,
66                "Cannot connect to unix sockets \
67                 on this platform",
68            ))
69        }
70    })
71}
72
73/// Executes a Redis transaction asynchronously by automatically watching keys and running
74/// a transaction loop until it succeeds. Similar to the synchronous [`transaction`](crate::transaction)
75/// function but for async execution.
76///
77/// The provided closure may be executed multiple times if the transaction fails due to
78/// watched keys being modified between WATCH and EXEC. Any side effects in the closure
79/// should account for possible multiple executions. The closure should return `Ok(None)` to indicate a transaction failure and to
80/// retry (this will happen automatically if the last call in the closure is to run the transaction), or `Err(err)` to abort the
81/// transaction with an error. A successful transaction should return `Ok(Some(value))` with the desired result from the EXEC command.
82///
83/// # Examples
84///
85/// ```rust,no_run
86/// use redis::{AsyncCommands, RedisResult, pipe};
87///
88/// async fn increment(con: redis::aio::MultiplexedConnection) -> RedisResult<isize> {
89///     let key = "my_counter";
90///     redis::aio::transaction_async(con, &[key], |mut con, mut pipe| async move {
91///         // Read the current value first
92///         let val: isize = con.get(key).await?;
93///         // Build the pipeline and execute it atomically (MULTI/EXEC are added automatically)
94///         pipe.set(key, val + 1)
95///             .ignore()
96///             .get(key)
97///             .query_async(&mut con)
98///             .await
99///     })
100///     .await
101/// }
102/// ```
103///
104/// # Notes
105///
106/// - The closure may be executed multiple times if watched keys are modified by other
107///   clients between `WATCH` and `EXEC`; its side effects must be idempotent.
108/// - A successful `EXEC` automatically discards all `WATCH`es, so no explicit `UNWATCH`
109///   is needed on the success path.
110/// - The transaction is automatically abandoned if the closure returns an error; an
111///   explicit `UNWATCH` is sent in that case to leave the connection in a clean state.
112///
113/// ## Warning: Concurrent Transactions on Multiplexed Connections
114///
115/// When using a multiplexed connection (e.g. async connection types in this crate),
116/// cloning shares the underlying channel. Running concurrent transactions on clones of
117/// the same multiplexed connection could lead to unexpected behavior: the
118/// `WATCH`/`MULTI`/`EXEC` sequence from one transaction may interleave with commands from
119/// another. Ensure at most one transaction is active on a given multiplexed
120/// connection at a time.
121///
122/// ## Warning: Transactions on cluster connections
123///
124/// A cluster connection is a collection of multiple underlying connections to different
125/// cluster nodes. Running a transaction on a cluster connection is only safe if all the
126/// keys being watched and modified in the transaction are guaranteed to be on the same
127/// cluster node, since Redis transactions cannot span multiple nodes. It is the caller's
128/// responsibility to ensure this condition is met when using `transaction_async` with a
129/// cluster connection.
130///
131/// For more details on Redis transactions, see the [Redis documentation](https://redis.io/topics/transactions)
132pub async fn transaction_async<
133    C: ConnectionLike + Clone,
134    K: ToRedisArgs,
135    T: FromRedisValue,
136    F: FnMut(C, Pipeline) -> Fut,
137    Fut: Future<Output = Result<Option<T>, RedisError>>,
138>(
139    mut connection: C,
140    keys: &[K],
141    mut func: F,
142) -> Result<T, RedisError> {
143    if keys.is_empty() {
144        fail!((
145            crate::errors::ErrorKind::InvalidClientConfig,
146            "At least one key must be provided to watch for transactions"
147        ));
148    }
149    loop {
150        cmd("WATCH").arg(keys).exec_async(&mut connection).await?;
151
152        let mut pipeline = pipe();
153        pipeline.atomic();
154        let response = func(connection.clone(), pipeline).await;
155        // Send UNWATCH as a best-effort safety net for any edge cases where EXEC
156        // was not reached (e.g. the closure returned None before calling query_async).
157        let _ = cmd("UNWATCH").exec_async(&mut connection).await;
158        if let Some(result) = response? {
159            return Ok(result);
160        }
161    }
162}
163
164#[cfg(test)]
165mod tests {
166    #[cfg(feature = "cluster-async")]
167    use crate::cluster_async;
168
169    use super::super::*;
170
171    #[test]
172    fn test_is_sync() {
173        const fn assert_sync<T: Sync>() {}
174
175        assert_sync::<MultiplexedConnection>();
176        assert_sync::<PubSub>();
177        assert_sync::<Monitor>();
178        #[cfg(feature = "connection-manager")]
179        assert_sync::<ConnectionManager>();
180        #[cfg(feature = "cluster-async")]
181        assert_sync::<cluster_async::ClusterConnection>();
182    }
183
184    #[test]
185    fn test_is_send() {
186        const fn assert_send<T: Send>() {}
187
188        assert_send::<MultiplexedConnection>();
189        assert_send::<PubSub>();
190        assert_send::<Monitor>();
191        #[cfg(feature = "connection-manager")]
192        assert_send::<ConnectionManager>();
193        #[cfg(feature = "cluster-async")]
194        assert_send::<cluster_async::ClusterConnection>();
195    }
196}