tokio/net/tcp/stream.rs
1cfg_not_wasi! {
2 use std::time::Duration;
3}
4
5cfg_not_wasip1! {
6 use crate::net::{to_socket_addrs, ToSocketAddrs};
7 use std::future::poll_fn;
8}
9
10use crate::io::{AsyncRead, AsyncWrite, Interest, PollEvented, ReadBuf, Ready};
11use crate::net::tcp::split::{split, ReadHalf, WriteHalf};
12use crate::net::tcp::split_owned::{split_owned, OwnedReadHalf, OwnedWriteHalf};
13use crate::util::check_socket_for_blocking;
14
15use std::fmt;
16use std::io;
17use std::net::{Shutdown, SocketAddr};
18use std::pin::Pin;
19use std::task::{ready, Context, Poll};
20
21cfg_io_util! {
22 use bytes::BufMut;
23}
24
25cfg_net! {
26 /// A TCP stream between a local and a remote socket.
27 ///
28 /// A TCP stream can either be created by connecting to an endpoint, via the
29 /// [`connect`] method, or by [accepting] a connection from a [listener]. A
30 /// TCP stream can also be created via the [`TcpSocket`] type.
31 ///
32 /// Reading and writing to a `TcpStream` is usually done using the
33 /// convenience methods found on the [`AsyncReadExt`] and [`AsyncWriteExt`]
34 /// traits.
35 ///
36 /// [`connect`]: method@TcpStream::connect
37 /// [accepting]: method@crate::net::TcpListener::accept
38 /// [listener]: struct@crate::net::TcpListener
39 /// [`TcpSocket`]: struct@crate::net::TcpSocket
40 /// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt
41 /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
42 ///
43 /// # Examples
44 ///
45 /// ```no_run
46 /// use tokio::net::TcpStream;
47 /// use tokio::io::AsyncWriteExt;
48 /// use std::error::Error;
49 ///
50 /// #[tokio::main]
51 /// async fn main() -> Result<(), Box<dyn Error>> {
52 /// // Connect to a peer
53 /// let mut stream = TcpStream::connect("127.0.0.1:8080").await?;
54 ///
55 /// // Write some data.
56 /// stream.write_all(b"hello world!").await?;
57 ///
58 /// Ok(())
59 /// }
60 /// ```
61 ///
62 /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait.
63 ///
64 /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all
65 /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
66 ///
67 /// To shut down the stream in the write direction, you can call the
68 /// [`shutdown()`] method. This will cause the other peer to receive a read of
69 /// length 0, indicating that no more data will be sent. This only closes
70 /// the stream in one direction.
71 ///
72 /// [`shutdown()`]: fn@crate::io::AsyncWriteExt::shutdown
73 pub struct TcpStream {
74 io: PollEvented<mio::net::TcpStream>,
75 }
76}
77
78impl TcpStream {
79 cfg_not_wasip1! {
80 /// Opens a TCP connection to a remote host.
81 ///
82 /// `addr` is an address of the remote host. Anything which implements the
83 /// [`ToSocketAddrs`] trait can be supplied as the address. If `addr`
84 /// yields multiple addresses, connect will be attempted with each of the
85 /// addresses until a connection is successful. If none of the addresses
86 /// result in a successful connection, the error returned from the last
87 /// connection attempt (the last address) is returned.
88 ///
89 /// To configure the socket before connecting, you can use the [`TcpSocket`]
90 /// type.
91 ///
92 /// [`ToSocketAddrs`]: trait@crate::net::ToSocketAddrs
93 /// [`TcpSocket`]: struct@crate::net::TcpSocket
94 ///
95 /// # Examples
96 ///
97 /// ```no_run
98 /// use tokio::net::TcpStream;
99 /// use tokio::io::AsyncWriteExt;
100 /// use std::error::Error;
101 ///
102 /// #[tokio::main]
103 /// async fn main() -> Result<(), Box<dyn Error>> {
104 /// // Connect to a peer
105 /// let mut stream = TcpStream::connect("127.0.0.1:8080").await?;
106 ///
107 /// // Write some data.
108 /// stream.write_all(b"hello world!").await?;
109 ///
110 /// Ok(())
111 /// }
112 /// ```
113 ///
114 /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait.
115 ///
116 /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all
117 /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
118 pub async fn connect<A: ToSocketAddrs>(addr: A) -> io::Result<TcpStream> {
119 let addrs = to_socket_addrs(addr).await?;
120
121 let mut last_err = None;
122
123 for addr in addrs {
124 match TcpStream::connect_addr(addr).await {
125 Ok(stream) => return Ok(stream),
126 Err(e) => last_err = Some(e),
127 }
128 }
129
130 Err(last_err.unwrap_or_else(|| {
131 io::Error::new(
132 io::ErrorKind::InvalidInput,
133 "could not resolve to any address",
134 )
135 }))
136 }
137
138 /// Establishes a connection to the specified `addr`.
139 async fn connect_addr(addr: SocketAddr) -> io::Result<TcpStream> {
140 let sys = mio::net::TcpStream::connect(addr)?;
141 TcpStream::connect_mio(sys).await
142 }
143
144 pub(crate) async fn connect_mio(sys: mio::net::TcpStream) -> io::Result<TcpStream> {
145 let stream = TcpStream::new(sys)?;
146
147 // Once we've connected, wait for the stream to be writable as
148 // that's when the actual connection has been initiated. Once we're
149 // writable we check for `take_socket_error` to see if the connect
150 // actually hit an error or not.
151 //
152 // If all that succeeded then we ship everything on up.
153 poll_fn(|cx| stream.io.registration().poll_write_ready(cx)).await?;
154
155 if let Some(e) = stream.io.take_error()? {
156 return Err(e);
157 }
158
159 Ok(stream)
160 }
161 }
162
163 pub(crate) fn new(connected: mio::net::TcpStream) -> io::Result<TcpStream> {
164 let io = PollEvented::new(connected)?;
165 Ok(TcpStream { io })
166 }
167
168 /// Creates new `TcpStream` from a `std::net::TcpStream`.
169 ///
170 /// This function is intended to be used to wrap a TCP stream from the
171 /// standard library in the Tokio equivalent.
172 ///
173 /// # Notes
174 ///
175 /// The caller is responsible for ensuring that the stream is in
176 /// non-blocking mode. Otherwise all I/O operations on the stream
177 /// will block the thread, which will cause unexpected behavior.
178 /// Non-blocking mode can be set using [`set_nonblocking`].
179 ///
180 /// Passing a listener in blocking mode is always erroneous,
181 /// and the behavior in that case may change in the future.
182 /// For example, it could panic.
183 ///
184 /// [`set_nonblocking`]: std::net::TcpStream::set_nonblocking
185 ///
186 /// # Examples
187 ///
188 /// ```rust,no_run
189 /// use std::error::Error;
190 /// use tokio::net::TcpStream;
191 ///
192 /// #[tokio::main]
193 /// async fn main() -> Result<(), Box<dyn Error>> {
194 /// let std_stream = std::net::TcpStream::connect("127.0.0.1:34254")?;
195 /// std_stream.set_nonblocking(true)?;
196 /// let stream = TcpStream::from_std(std_stream)?;
197 /// Ok(())
198 /// }
199 /// ```
200 ///
201 /// # Panics
202 ///
203 /// This function panics if it is not called from within a runtime with
204 /// IO enabled.
205 ///
206 /// The runtime is usually set implicitly when this function is called
207 /// from a future driven by a tokio runtime, otherwise runtime can be set
208 /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
209 #[track_caller]
210 pub fn from_std(stream: std::net::TcpStream) -> io::Result<TcpStream> {
211 check_socket_for_blocking(&stream)?;
212
213 let io = mio::net::TcpStream::from_std(stream);
214 let io = PollEvented::new(io)?;
215 Ok(TcpStream { io })
216 }
217
218 /// Turns a [`tokio::net::TcpStream`] into a [`std::net::TcpStream`].
219 ///
220 /// The returned [`std::net::TcpStream`] will have nonblocking mode set as `true`.
221 /// Use [`set_nonblocking`] to change the blocking mode if needed.
222 ///
223 /// # Examples
224 ///
225 /// ```
226 /// use std::error::Error;
227 /// use std::io::Read;
228 /// use tokio::net::TcpListener;
229 /// # use tokio::net::TcpStream;
230 /// # use tokio::io::AsyncWriteExt;
231 ///
232 /// #[tokio::main]
233 /// async fn main() -> Result<(), Box<dyn Error>> {
234 /// # if cfg!(miri) { return Ok(()); } // No `socket` in miri.
235 /// let mut data = [0u8; 12];
236 /// # if false {
237 /// let listener = TcpListener::bind("127.0.0.1:34254").await?;
238 /// # }
239 /// # let listener = TcpListener::bind("127.0.0.1:0").await?;
240 /// # let addr = listener.local_addr().unwrap();
241 /// # let handle = tokio::spawn(async move {
242 /// # let mut stream: TcpStream = TcpStream::connect(addr).await.unwrap();
243 /// # stream.write_all(b"Hello world!").await.unwrap();
244 /// # });
245 /// let (tokio_tcp_stream, _) = listener.accept().await?;
246 /// let mut std_tcp_stream = tokio_tcp_stream.into_std()?;
247 /// # handle.await.expect("The task being joined has panicked");
248 /// std_tcp_stream.set_nonblocking(false)?;
249 /// std_tcp_stream.read_exact(&mut data)?;
250 /// # assert_eq!(b"Hello world!", &data);
251 /// Ok(())
252 /// }
253 /// ```
254 /// [`tokio::net::TcpStream`]: TcpStream
255 /// [`std::net::TcpStream`]: std::net::TcpStream
256 /// [`set_nonblocking`]: fn@std::net::TcpStream::set_nonblocking
257 pub fn into_std(self) -> io::Result<std::net::TcpStream> {
258 #[cfg(unix)]
259 {
260 use std::os::unix::io::{FromRawFd, IntoRawFd};
261 self.io
262 .into_inner()
263 .map(IntoRawFd::into_raw_fd)
264 .map(|raw_fd| unsafe { std::net::TcpStream::from_raw_fd(raw_fd) })
265 }
266
267 #[cfg(windows)]
268 {
269 use std::os::windows::io::{FromRawSocket, IntoRawSocket};
270 self.io
271 .into_inner()
272 .map(|io| io.into_raw_socket())
273 .map(|raw_socket| unsafe { std::net::TcpStream::from_raw_socket(raw_socket) })
274 }
275
276 #[cfg(target_os = "wasi")]
277 {
278 use std::os::fd::{FromRawFd, IntoRawFd};
279 self.io
280 .into_inner()
281 .map(|io| io.into_raw_fd())
282 .map(|raw_fd| unsafe { std::net::TcpStream::from_raw_fd(raw_fd) })
283 }
284 }
285
286 /// Returns the local address that this stream is bound to.
287 ///
288 /// # Examples
289 ///
290 /// ```no_run
291 /// use tokio::net::TcpStream;
292 ///
293 /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
294 /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
295 ///
296 /// println!("{:?}", stream.local_addr()?);
297 /// # Ok(())
298 /// # }
299 /// ```
300 pub fn local_addr(&self) -> io::Result<SocketAddr> {
301 self.io.local_addr()
302 }
303
304 /// Returns the value of the `SO_ERROR` option.
305 pub fn take_error(&self) -> io::Result<Option<io::Error>> {
306 self.io.take_error()
307 }
308
309 /// Returns the remote address that this stream is connected to.
310 ///
311 /// # Examples
312 ///
313 /// ```no_run
314 /// use tokio::net::TcpStream;
315 ///
316 /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
317 /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
318 ///
319 /// println!("{:?}", stream.peer_addr()?);
320 /// # Ok(())
321 /// # }
322 /// ```
323 pub fn peer_addr(&self) -> io::Result<SocketAddr> {
324 self.io.peer_addr()
325 }
326
327 /// Attempts to receive data on the socket, without removing that data from
328 /// the queue, registering the current task for wakeup if data is not yet
329 /// available.
330 ///
331 /// Note that on multiple calls to `poll_peek`, `poll_read` or
332 /// `poll_read_ready`, only the `Waker` from the `Context` passed to the
333 /// most recent call is scheduled to receive a wakeup. (However,
334 /// `poll_write` retains a second, independent waker.)
335 ///
336 /// # Return value
337 ///
338 /// The function returns:
339 ///
340 /// * `Poll::Pending` if data is not yet available.
341 /// * `Poll::Ready(Ok(n))` if data is available. `n` is the number of bytes peeked.
342 /// * `Poll::Ready(Err(e))` if an error is encountered.
343 ///
344 /// # Errors
345 ///
346 /// This function may encounter any standard I/O error except `WouldBlock`.
347 ///
348 /// # Examples
349 ///
350 /// ```no_run
351 /// use tokio::io::{self, ReadBuf};
352 /// use tokio::net::TcpStream;
353 ///
354 /// use std::future::poll_fn;
355 ///
356 /// #[tokio::main]
357 /// async fn main() -> io::Result<()> {
358 /// let stream = TcpStream::connect("127.0.0.1:8000").await?;
359 /// let mut buf = [0; 10];
360 /// let mut buf = ReadBuf::new(&mut buf);
361 ///
362 /// poll_fn(|cx| {
363 /// stream.poll_peek(cx, &mut buf)
364 /// }).await?;
365 ///
366 /// Ok(())
367 /// }
368 /// ```
369 pub fn poll_peek(
370 &self,
371 cx: &mut Context<'_>,
372 buf: &mut ReadBuf<'_>,
373 ) -> Poll<io::Result<usize>> {
374 loop {
375 let ev = ready!(self.io.registration().poll_read_ready(cx))?;
376
377 let b = unsafe {
378 &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8])
379 };
380
381 match self.io.peek(b) {
382 Ok(ret) => {
383 unsafe { buf.assume_init(ret) };
384 buf.advance(ret);
385 return Poll::Ready(Ok(ret));
386 }
387 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
388 self.io.registration().clear_readiness(ev);
389 }
390 Err(e) => return Poll::Ready(Err(e)),
391 }
392 }
393 }
394
395 /// Waits for any of the requested ready states.
396 ///
397 /// This function is usually paired with `try_read()` or `try_write()`. It
398 /// can be used to concurrently read / write to the same socket on a single
399 /// task without splitting the socket.
400 ///
401 /// The function may complete without the socket being ready. This is a
402 /// false-positive and attempting an operation will return with
403 /// `io::ErrorKind::WouldBlock`. The function can also return with an empty
404 /// [`Ready`] set, so you should always check the returned value and possibly
405 /// wait again if the requested states are not set.
406 ///
407 /// # Cancel safety
408 ///
409 /// This method is cancel safe. Once a readiness event occurs, the method
410 /// will continue to return immediately until the readiness event is
411 /// consumed by an attempt to read or write that fails with `WouldBlock` or
412 /// `Poll::Pending`.
413 ///
414 /// # Examples
415 ///
416 /// Concurrently read and write to the stream on the same task without
417 /// splitting.
418 ///
419 /// ```no_run
420 /// use tokio::io::Interest;
421 /// use tokio::net::TcpStream;
422 /// use std::error::Error;
423 /// use std::io;
424 ///
425 /// #[tokio::main]
426 /// async fn main() -> Result<(), Box<dyn Error>> {
427 /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
428 ///
429 /// loop {
430 /// let ready = stream.ready(Interest::READABLE | Interest::WRITABLE).await?;
431 ///
432 /// if ready.is_readable() {
433 /// let mut data = vec![0; 1024];
434 /// // Try to read data, this may still fail with `WouldBlock`
435 /// // if the readiness event is a false positive.
436 /// match stream.try_read(&mut data) {
437 /// Ok(n) => {
438 /// println!("read {} bytes", n);
439 /// }
440 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
441 /// continue;
442 /// }
443 /// Err(e) => {
444 /// return Err(e.into());
445 /// }
446 /// }
447 ///
448 /// }
449 ///
450 /// if ready.is_writable() {
451 /// // Try to write data, this may still fail with `WouldBlock`
452 /// // if the readiness event is a false positive.
453 /// match stream.try_write(b"hello world") {
454 /// Ok(n) => {
455 /// println!("write {} bytes", n);
456 /// }
457 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
458 /// continue
459 /// }
460 /// Err(e) => {
461 /// return Err(e.into());
462 /// }
463 /// }
464 /// }
465 /// }
466 /// }
467 /// ```
468 pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
469 let event = self.io.registration().readiness(interest).await?;
470 Ok(event.ready)
471 }
472
473 /// Waits for the socket to become readable.
474 ///
475 /// This function is equivalent to `ready(Interest::READABLE)` and is usually
476 /// paired with `try_read()`.
477 ///
478 /// # Cancel safety
479 ///
480 /// This method is cancel safe. Once a readiness event occurs, the method
481 /// will continue to return immediately until the readiness event is
482 /// consumed by an attempt to read that fails with `WouldBlock` or
483 /// `Poll::Pending`.
484 ///
485 /// # Examples
486 ///
487 /// ```no_run
488 /// use tokio::net::TcpStream;
489 /// use std::error::Error;
490 /// use std::io;
491 ///
492 /// #[tokio::main]
493 /// async fn main() -> Result<(), Box<dyn Error>> {
494 /// // Connect to a peer
495 /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
496 ///
497 /// let mut msg = vec![0; 1024];
498 ///
499 /// loop {
500 /// // Wait for the socket to be readable
501 /// stream.readable().await?;
502 ///
503 /// // Try to read data, this may still fail with `WouldBlock`
504 /// // if the readiness event is a false positive.
505 /// match stream.try_read(&mut msg) {
506 /// Ok(n) => {
507 /// msg.truncate(n);
508 /// break;
509 /// }
510 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
511 /// continue;
512 /// }
513 /// Err(e) => {
514 /// return Err(e.into());
515 /// }
516 /// }
517 /// }
518 ///
519 /// println!("GOT = {:?}", msg);
520 /// Ok(())
521 /// }
522 /// ```
523 pub async fn readable(&self) -> io::Result<()> {
524 self.ready(Interest::READABLE).await?;
525 Ok(())
526 }
527
528 /// Polls for read readiness.
529 ///
530 /// If the tcp stream is not currently ready for reading, this method will
531 /// store a clone of the `Waker` from the provided `Context`. When the tcp
532 /// stream becomes ready for reading, `Waker::wake` will be called on the
533 /// waker.
534 ///
535 /// Note that on multiple calls to `poll_read_ready`, `poll_read` or
536 /// `poll_peek`, only the `Waker` from the `Context` passed to the most
537 /// recent call is scheduled to receive a wakeup. (However,
538 /// `poll_write_ready` retains a second, independent waker.)
539 ///
540 /// This function is intended for cases where creating and pinning a future
541 /// via [`readable`] is not feasible. Where possible, using [`readable`] is
542 /// preferred, as this supports polling from multiple tasks at once.
543 ///
544 /// # Return value
545 ///
546 /// The function returns:
547 ///
548 /// * `Poll::Pending` if the tcp stream is not ready for reading.
549 /// * `Poll::Ready(Ok(()))` if the tcp stream is ready for reading.
550 /// * `Poll::Ready(Err(e))` if an error is encountered.
551 ///
552 /// # Errors
553 ///
554 /// This function may encounter any standard I/O error except `WouldBlock`.
555 ///
556 /// [`readable`]: method@Self::readable
557 pub fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
558 self.io.registration().poll_read_ready(cx).map_ok(|_| ())
559 }
560
561 /// Tries to read data from the stream into the provided buffer, returning how
562 /// many bytes were read.
563 ///
564 /// Receives any pending data from the socket but does not wait for new data
565 /// to arrive. On success, returns the number of bytes read. Because
566 /// `try_read()` is non-blocking, the buffer does not have to be stored by
567 /// the async task and can exist entirely on the stack.
568 ///
569 /// Usually, [`readable()`] or [`ready()`] is used with this function.
570 ///
571 /// [`readable()`]: TcpStream::readable()
572 /// [`ready()`]: TcpStream::ready()
573 ///
574 /// # Return
575 ///
576 /// If data is successfully read, `Ok(n)` is returned, where `n` is the
577 /// number of bytes read. If `n` is `0`, then it can indicate one of two scenarios:
578 ///
579 /// 1. The stream's read half is closed and will no longer yield data.
580 /// 2. The specified buffer was 0 bytes in length.
581 ///
582 /// If the stream is not ready to read data,
583 /// `Err(io::ErrorKind::WouldBlock)` is returned.
584 ///
585 /// # Examples
586 ///
587 /// ```no_run
588 /// use tokio::net::TcpStream;
589 /// use std::error::Error;
590 /// use std::io;
591 ///
592 /// #[tokio::main]
593 /// async fn main() -> Result<(), Box<dyn Error>> {
594 /// // Connect to a peer
595 /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
596 ///
597 /// loop {
598 /// // Wait for the socket to be readable
599 /// stream.readable().await?;
600 ///
601 /// // Creating the buffer **after** the `await` prevents it from
602 /// // being stored in the async task.
603 /// let mut buf = [0; 4096];
604 ///
605 /// // Try to read data, this may still fail with `WouldBlock`
606 /// // if the readiness event is a false positive.
607 /// match stream.try_read(&mut buf) {
608 /// Ok(0) => break,
609 /// Ok(n) => {
610 /// println!("read {} bytes", n);
611 /// }
612 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
613 /// continue;
614 /// }
615 /// Err(e) => {
616 /// return Err(e.into());
617 /// }
618 /// }
619 /// }
620 ///
621 /// Ok(())
622 /// }
623 /// ```
624 pub fn try_read(&self, buf: &mut [u8]) -> io::Result<usize> {
625 use std::io::Read;
626
627 self.io
628 .registration()
629 .try_io(Interest::READABLE, || (&*self.io).read(buf))
630 }
631
632 /// Tries to read data from the stream into the provided buffers, returning
633 /// how many bytes were read.
634 ///
635 /// Data is copied to fill each buffer in order, with the final buffer
636 /// written to possibly being only partially filled. This method behaves
637 /// equivalently to a single call to [`try_read()`] with concatenated
638 /// buffers.
639 ///
640 /// Receives any pending data from the socket but does not wait for new data
641 /// to arrive. On success, returns the number of bytes read. Because
642 /// `try_read_vectored()` is non-blocking, the buffer does not have to be
643 /// stored by the async task and can exist entirely on the stack.
644 ///
645 /// Usually, [`readable()`] or [`ready()`] is used with this function.
646 ///
647 /// [`try_read()`]: TcpStream::try_read()
648 /// [`readable()`]: TcpStream::readable()
649 /// [`ready()`]: TcpStream::ready()
650 ///
651 /// # Return
652 ///
653 /// If data is successfully read, `Ok(n)` is returned, where `n` is the
654 /// number of bytes read. `Ok(0)` indicates the stream's read half is closed
655 /// and will no longer yield data. If the stream is not ready to read data
656 /// `Err(io::ErrorKind::WouldBlock)` is returned.
657 ///
658 /// # Examples
659 ///
660 /// ```no_run
661 /// use tokio::net::TcpStream;
662 /// use std::error::Error;
663 /// use std::io::{self, IoSliceMut};
664 ///
665 /// #[tokio::main]
666 /// async fn main() -> Result<(), Box<dyn Error>> {
667 /// // Connect to a peer
668 /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
669 ///
670 /// loop {
671 /// // Wait for the socket to be readable
672 /// stream.readable().await?;
673 ///
674 /// // Creating the buffer **after** the `await` prevents it from
675 /// // being stored in the async task.
676 /// let mut buf_a = [0; 512];
677 /// let mut buf_b = [0; 1024];
678 /// let mut bufs = [
679 /// IoSliceMut::new(&mut buf_a),
680 /// IoSliceMut::new(&mut buf_b),
681 /// ];
682 ///
683 /// // Try to read data, this may still fail with `WouldBlock`
684 /// // if the readiness event is a false positive.
685 /// match stream.try_read_vectored(&mut bufs) {
686 /// Ok(0) => break,
687 /// Ok(n) => {
688 /// println!("read {} bytes", n);
689 /// }
690 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
691 /// continue;
692 /// }
693 /// Err(e) => {
694 /// return Err(e.into());
695 /// }
696 /// }
697 /// }
698 ///
699 /// Ok(())
700 /// }
701 /// ```
702 pub fn try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize> {
703 use std::io::Read;
704
705 self.io
706 .registration()
707 .try_io(Interest::READABLE, || (&*self.io).read_vectored(bufs))
708 }
709
710 cfg_io_util! {
711 /// Tries to read data from the stream into the provided buffer, advancing the
712 /// buffer's internal cursor, returning how many bytes were read.
713 ///
714 /// Receives any pending data from the socket but does not wait for new data
715 /// to arrive. On success, returns the number of bytes read. Because
716 /// `try_read_buf()` is non-blocking, the buffer does not have to be stored by
717 /// the async task and can exist entirely on the stack.
718 ///
719 /// Usually, [`readable()`] or [`ready()`] is used with this function.
720 ///
721 /// [`readable()`]: TcpStream::readable()
722 /// [`ready()`]: TcpStream::ready()
723 ///
724 /// # Return
725 ///
726 /// If data is successfully read, `Ok(n)` is returned, where `n` is the
727 /// number of bytes read. `Ok(0)` indicates the stream's read half is closed
728 /// and will no longer yield data. If the stream is not ready to read data
729 /// `Err(io::ErrorKind::WouldBlock)` is returned.
730 ///
731 /// # Examples
732 ///
733 /// ```no_run
734 /// use tokio::net::TcpStream;
735 /// use std::error::Error;
736 /// use std::io;
737 ///
738 /// #[tokio::main]
739 /// async fn main() -> Result<(), Box<dyn Error>> {
740 /// // Connect to a peer
741 /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
742 ///
743 /// loop {
744 /// // Wait for the socket to be readable
745 /// stream.readable().await?;
746 ///
747 /// let mut buf = Vec::with_capacity(4096);
748 ///
749 /// // Try to read data, this may still fail with `WouldBlock`
750 /// // if the readiness event is a false positive.
751 /// match stream.try_read_buf(&mut buf) {
752 /// Ok(0) => break,
753 /// Ok(n) => {
754 /// println!("read {} bytes", n);
755 /// }
756 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
757 /// continue;
758 /// }
759 /// Err(e) => {
760 /// return Err(e.into());
761 /// }
762 /// }
763 /// }
764 ///
765 /// Ok(())
766 /// }
767 /// ```
768 pub fn try_read_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> {
769 self.io.registration().try_io(Interest::READABLE, || {
770 use std::io::Read;
771
772 let dst = buf.chunk_mut();
773 let dst =
774 unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) };
775
776 // Safety: We trust `TcpStream::read` to have filled up `n` bytes in the
777 // buffer.
778 let n = (&*self.io).read(dst)?;
779
780 unsafe {
781 buf.advance_mut(n);
782 }
783
784 Ok(n)
785 })
786 }
787 }
788
789 /// Waits for the socket to become writable.
790 ///
791 /// This function is equivalent to `ready(Interest::WRITABLE)` and is usually
792 /// paired with `try_write()`.
793 ///
794 /// # Cancel safety
795 ///
796 /// This method is cancel safe. Once a readiness event occurs, the method
797 /// will continue to return immediately until the readiness event is
798 /// consumed by an attempt to write that fails with `WouldBlock` or
799 /// `Poll::Pending`.
800 ///
801 /// # Examples
802 ///
803 /// ```no_run
804 /// use tokio::net::TcpStream;
805 /// use std::error::Error;
806 /// use std::io;
807 ///
808 /// #[tokio::main]
809 /// async fn main() -> Result<(), Box<dyn Error>> {
810 /// // Connect to a peer
811 /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
812 ///
813 /// loop {
814 /// // Wait for the socket to be writable
815 /// stream.writable().await?;
816 ///
817 /// // Try to write data, this may still fail with `WouldBlock`
818 /// // if the readiness event is a false positive.
819 /// match stream.try_write(b"hello world") {
820 /// Ok(n) => {
821 /// break;
822 /// }
823 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
824 /// continue;
825 /// }
826 /// Err(e) => {
827 /// return Err(e.into());
828 /// }
829 /// }
830 /// }
831 ///
832 /// Ok(())
833 /// }
834 /// ```
835 pub async fn writable(&self) -> io::Result<()> {
836 self.ready(Interest::WRITABLE).await?;
837 Ok(())
838 }
839
840 /// Polls for write readiness.
841 ///
842 /// If the tcp stream is not currently ready for writing, this method will
843 /// store a clone of the `Waker` from the provided `Context`. When the tcp
844 /// stream becomes ready for writing, `Waker::wake` will be called on the
845 /// waker.
846 ///
847 /// Note that on multiple calls to `poll_write_ready` or `poll_write`, only
848 /// the `Waker` from the `Context` passed to the most recent call is
849 /// scheduled to receive a wakeup. (However, `poll_read_ready` retains a
850 /// second, independent waker.)
851 ///
852 /// This function is intended for cases where creating and pinning a future
853 /// via [`writable`] is not feasible. Where possible, using [`writable`] is
854 /// preferred, as this supports polling from multiple tasks at once.
855 ///
856 /// # Return value
857 ///
858 /// The function returns:
859 ///
860 /// * `Poll::Pending` if the tcp stream is not ready for writing.
861 /// * `Poll::Ready(Ok(()))` if the tcp stream is ready for writing.
862 /// * `Poll::Ready(Err(e))` if an error is encountered.
863 ///
864 /// # Errors
865 ///
866 /// This function may encounter any standard I/O error except `WouldBlock`.
867 ///
868 /// [`writable`]: method@Self::writable
869 pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
870 self.io.registration().poll_write_ready(cx).map_ok(|_| ())
871 }
872
873 /// Try to write a buffer to the stream, returning how many bytes were
874 /// written.
875 ///
876 /// The function will attempt to write the entire contents of `buf`, but
877 /// only part of the buffer may be written.
878 ///
879 /// This function is usually paired with `writable()`.
880 ///
881 /// # Return
882 ///
883 /// If data is successfully written, `Ok(n)` is returned, where `n` is the
884 /// number of bytes written. If the stream is not ready to write data,
885 /// `Err(io::ErrorKind::WouldBlock)` is returned.
886 ///
887 /// # Examples
888 ///
889 /// ```no_run
890 /// use tokio::net::TcpStream;
891 /// use std::error::Error;
892 /// use std::io;
893 ///
894 /// #[tokio::main]
895 /// async fn main() -> Result<(), Box<dyn Error>> {
896 /// // Connect to a peer
897 /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
898 ///
899 /// loop {
900 /// // Wait for the socket to be writable
901 /// stream.writable().await?;
902 ///
903 /// // Try to write data, this may still fail with `WouldBlock`
904 /// // if the readiness event is a false positive.
905 /// match stream.try_write(b"hello world") {
906 /// Ok(n) => {
907 /// break;
908 /// }
909 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
910 /// continue;
911 /// }
912 /// Err(e) => {
913 /// return Err(e.into());
914 /// }
915 /// }
916 /// }
917 ///
918 /// Ok(())
919 /// }
920 /// ```
921 pub fn try_write(&self, buf: &[u8]) -> io::Result<usize> {
922 use std::io::Write;
923
924 self.io
925 .registration()
926 .try_io(Interest::WRITABLE, || (&*self.io).write(buf))
927 }
928
929 /// Tries to write several buffers to the stream, returning how many bytes
930 /// were written.
931 ///
932 /// Data is written from each buffer in order, with the final buffer read
933 /// from possibly being only partially consumed. This method behaves
934 /// equivalently to a single call to [`try_write()`] with concatenated
935 /// buffers.
936 ///
937 /// This function is usually paired with `writable()`.
938 ///
939 /// [`try_write()`]: TcpStream::try_write()
940 ///
941 /// # Return
942 ///
943 /// If data is successfully written, `Ok(n)` is returned, where `n` is the
944 /// number of bytes written. If the stream is not ready to write data,
945 /// `Err(io::ErrorKind::WouldBlock)` is returned.
946 ///
947 /// # Examples
948 ///
949 /// ```no_run
950 /// use tokio::net::TcpStream;
951 /// use std::error::Error;
952 /// use std::io;
953 ///
954 /// #[tokio::main]
955 /// async fn main() -> Result<(), Box<dyn Error>> {
956 /// // Connect to a peer
957 /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
958 ///
959 /// let bufs = [io::IoSlice::new(b"hello "), io::IoSlice::new(b"world")];
960 ///
961 /// loop {
962 /// // Wait for the socket to be writable
963 /// stream.writable().await?;
964 ///
965 /// // Try to write data, this may still fail with `WouldBlock`
966 /// // if the readiness event is a false positive.
967 /// match stream.try_write_vectored(&bufs) {
968 /// Ok(n) => {
969 /// break;
970 /// }
971 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
972 /// continue;
973 /// }
974 /// Err(e) => {
975 /// return Err(e.into());
976 /// }
977 /// }
978 /// }
979 ///
980 /// Ok(())
981 /// }
982 /// ```
983 pub fn try_write_vectored(&self, bufs: &[io::IoSlice<'_>]) -> io::Result<usize> {
984 use std::io::Write;
985
986 self.io
987 .registration()
988 .try_io(Interest::WRITABLE, || (&*self.io).write_vectored(bufs))
989 }
990
991 /// Tries to read or write from the socket using a user-provided IO operation.
992 ///
993 /// If the socket is ready, the provided closure is called. The closure
994 /// should attempt to perform IO operation on the socket by manually
995 /// calling the appropriate syscall. If the operation fails because the
996 /// socket is not actually ready, then the closure should return a
997 /// `WouldBlock` error and the readiness flag is cleared. The return value
998 /// of the closure is then returned by `try_io`.
999 ///
1000 /// If the socket is not ready, then the closure is not called
1001 /// and a `WouldBlock` error is returned.
1002 ///
1003 /// The closure should only return a `WouldBlock` error if it has performed
1004 /// an IO operation on the socket that failed due to the socket not being
1005 /// ready. Returning a `WouldBlock` error in any other situation will
1006 /// incorrectly clear the readiness flag, which can cause the socket to
1007 /// behave incorrectly.
1008 ///
1009 /// The closure should not perform the IO operation using any of the methods
1010 /// defined on the Tokio `TcpStream` type, as this will mess with the
1011 /// readiness flag and can cause the socket to behave incorrectly.
1012 ///
1013 /// This method is not intended to be used with combined interests.
1014 /// The closure should perform only one type of IO operation, so it should not
1015 /// require more than one ready state. This method may panic or sleep forever
1016 /// if it is called with a combined interest.
1017 ///
1018 /// Usually, [`readable()`], [`writable()`] or [`ready()`] is used with this function.
1019 ///
1020 /// [`readable()`]: TcpStream::readable()
1021 /// [`writable()`]: TcpStream::writable()
1022 /// [`ready()`]: TcpStream::ready()
1023 pub fn try_io<R>(
1024 &self,
1025 interest: Interest,
1026 f: impl FnOnce() -> io::Result<R>,
1027 ) -> io::Result<R> {
1028 self.io
1029 .registration()
1030 .try_io(interest, || self.io.try_io(f))
1031 }
1032
1033 /// Reads or writes from the socket using a user-provided IO operation.
1034 ///
1035 /// The readiness of the socket is awaited and when the socket is ready,
1036 /// the provided closure is called. The closure should attempt to perform
1037 /// IO operation on the socket by manually calling the appropriate syscall.
1038 /// If the operation fails because the socket is not actually ready,
1039 /// then the closure should return a `WouldBlock` error. In such case the
1040 /// readiness flag is cleared and the socket readiness is awaited again.
1041 /// This loop is repeated until the closure returns an `Ok` or an error
1042 /// other than `WouldBlock`.
1043 ///
1044 /// The closure should only return a `WouldBlock` error if it has performed
1045 /// an IO operation on the socket that failed due to the socket not being
1046 /// ready. Returning a `WouldBlock` error in any other situation will
1047 /// incorrectly clear the readiness flag, which can cause the socket to
1048 /// behave incorrectly.
1049 ///
1050 /// The closure should not perform the IO operation using any of the methods
1051 /// defined on the Tokio `TcpStream` type, as this will mess with the
1052 /// readiness flag and can cause the socket to behave incorrectly.
1053 ///
1054 /// This method is not intended to be used with combined interests.
1055 /// The closure should perform only one type of IO operation, so it should not
1056 /// require more than one ready state. This method may panic or sleep forever
1057 /// if it is called with a combined interest.
1058 pub async fn async_io<R>(
1059 &self,
1060 interest: Interest,
1061 mut f: impl FnMut() -> io::Result<R>,
1062 ) -> io::Result<R> {
1063 self.io
1064 .registration()
1065 .async_io(interest, || self.io.try_io(&mut f))
1066 .await
1067 }
1068
1069 /// Receives data on the socket from the remote address to which it is
1070 /// connected, without removing that data from the queue. On success,
1071 /// returns the number of bytes peeked.
1072 ///
1073 /// Successive calls return the same data. This is accomplished by passing
1074 /// `MSG_PEEK` as a flag to the underlying `recv` system call.
1075 ///
1076 /// # Cancel safety
1077 ///
1078 /// This method is cancel safe. If the method is used as the event in a
1079 /// [`tokio::select!`](crate::select) statement and some other branch
1080 /// completes first, then it is guaranteed that no peek was performed, and
1081 /// that `buf` has not been modified.
1082 ///
1083 /// # Examples
1084 ///
1085 /// ```no_run
1086 /// use tokio::net::TcpStream;
1087 /// use tokio::io::AsyncReadExt;
1088 /// use std::error::Error;
1089 ///
1090 /// #[tokio::main]
1091 /// async fn main() -> Result<(), Box<dyn Error>> {
1092 /// // Connect to a peer
1093 /// let mut stream = TcpStream::connect("127.0.0.1:8080").await?;
1094 ///
1095 /// let mut b1 = [0; 10];
1096 /// let mut b2 = [0; 10];
1097 ///
1098 /// // Peek at the data
1099 /// let n = stream.peek(&mut b1).await?;
1100 ///
1101 /// // Read the data
1102 /// assert_eq!(n, stream.read(&mut b2[..n]).await?);
1103 /// assert_eq!(&b1[..n], &b2[..n]);
1104 ///
1105 /// Ok(())
1106 /// }
1107 /// ```
1108 ///
1109 /// The [`read`] method is defined on the [`AsyncReadExt`] trait.
1110 ///
1111 /// [`read`]: fn@crate::io::AsyncReadExt::read
1112 /// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt
1113 pub async fn peek(&self, buf: &mut [u8]) -> io::Result<usize> {
1114 self.io
1115 .registration()
1116 .async_io(Interest::READABLE, || self.io.peek(buf))
1117 .await
1118 }
1119
1120 /// Shuts down the read, write, or both halves of this connection.
1121 ///
1122 /// This function will cause all pending and future I/O on the specified
1123 /// portions to return immediately with an appropriate value (see the
1124 /// documentation of `Shutdown`).
1125 ///
1126 /// Remark: this function transforms `Err(std::io::ErrorKind::NotConnected)` to `Ok(())`.
1127 /// It does this to abstract away OS specific logic and to prevent a race condition between
1128 /// this function call and the OS closing this socket because of external events (e.g. TCP reset).
1129 /// See <https://github.com/tokio-rs/tokio/issues/4665> for more information.
1130 pub(super) fn shutdown_std(&self, how: Shutdown) -> io::Result<()> {
1131 match self.io.shutdown(how) {
1132 Err(err) if err.kind() == std::io::ErrorKind::NotConnected => Ok(()),
1133 result => result,
1134 }
1135 }
1136
1137 /// Gets the value of the `TCP_NODELAY` option on this socket.
1138 ///
1139 /// For more information about this option, see [`set_nodelay`].
1140 ///
1141 /// [`set_nodelay`]: TcpStream::set_nodelay
1142 ///
1143 /// # Examples
1144 ///
1145 /// ```no_run
1146 /// use tokio::net::TcpStream;
1147 ///
1148 /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
1149 /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
1150 ///
1151 /// println!("{:?}", stream.nodelay()?);
1152 /// # Ok(())
1153 /// # }
1154 /// ```
1155 pub fn nodelay(&self) -> io::Result<bool> {
1156 self.io.nodelay()
1157 }
1158
1159 /// Sets the value of the `TCP_NODELAY` option on this socket.
1160 ///
1161 /// If set, this option disables the Nagle algorithm. This means that
1162 /// segments are always sent as soon as possible, even if there is only a
1163 /// small amount of data. When not set, data is buffered until there is a
1164 /// sufficient amount to send out, thereby avoiding the frequent sending of
1165 /// small packets.
1166 ///
1167 /// # Examples
1168 ///
1169 /// ```no_run
1170 /// use tokio::net::TcpStream;
1171 ///
1172 /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
1173 /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
1174 ///
1175 /// stream.set_nodelay(true)?;
1176 /// # Ok(())
1177 /// # }
1178 /// ```
1179 pub fn set_nodelay(&self, nodelay: bool) -> io::Result<()> {
1180 self.io.set_nodelay(nodelay)
1181 }
1182
1183 /// Gets the value of the `TCP_QUICKACK` option on this socket.
1184 ///
1185 /// For more information about this option, see [`TcpStream::set_quickack`].
1186 ///
1187 /// # Examples
1188 ///
1189 /// ```no_run
1190 /// use tokio::net::TcpStream;
1191 ///
1192 /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
1193 /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
1194 ///
1195 /// stream.quickack()?;
1196 /// # Ok(())
1197 /// # }
1198 /// ```
1199 #[cfg(any(
1200 target_os = "linux",
1201 target_os = "android",
1202 target_os = "fuchsia",
1203 target_os = "cygwin",
1204 ))]
1205 #[cfg_attr(
1206 docsrs,
1207 doc(cfg(any(
1208 target_os = "linux",
1209 target_os = "android",
1210 target_os = "fuchsia",
1211 target_os = "cygwin"
1212 )))
1213 )]
1214 pub fn quickack(&self) -> io::Result<bool> {
1215 socket2::SockRef::from(self).tcp_quickack()
1216 }
1217
1218 /// Enable or disable `TCP_QUICKACK`.
1219 ///
1220 /// This flag causes Linux to eagerly send `ACK`s rather than delaying them.
1221 /// Linux may reset this flag after further operations on the socket.
1222 ///
1223 /// See [`man 7 tcp`](https://man7.org/linux/man-pages/man7/tcp.7.html) and
1224 /// [TCP delayed acknowledgment](https://en.wikipedia.org/wiki/TCP_delayed_acknowledgment)
1225 /// for more information.
1226 ///
1227 /// # Examples
1228 ///
1229 /// ```no_run
1230 /// use tokio::net::TcpStream;
1231 ///
1232 /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
1233 /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
1234 ///
1235 /// stream.set_quickack(true)?;
1236 /// # Ok(())
1237 /// # }
1238 /// ```
1239 #[cfg(any(
1240 target_os = "linux",
1241 target_os = "android",
1242 target_os = "fuchsia",
1243 target_os = "cygwin",
1244 ))]
1245 #[cfg_attr(
1246 docsrs,
1247 doc(cfg(any(
1248 target_os = "linux",
1249 target_os = "android",
1250 target_os = "fuchsia",
1251 target_os = "cygwin"
1252 )))
1253 )]
1254 pub fn set_quickack(&self, quickack: bool) -> io::Result<()> {
1255 socket2::SockRef::from(self).set_tcp_quickack(quickack)
1256 }
1257
1258 cfg_not_wasi! {
1259 /// Reads the linger duration for this socket by getting the `SO_LINGER`
1260 /// option.
1261 ///
1262 /// For more information about this option, see [`set_zero_linger`] and [`set_linger`].
1263 ///
1264 /// [`set_linger`]: TcpStream::set_linger
1265 /// [`set_zero_linger`]: TcpStream::set_zero_linger
1266 ///
1267 /// # Examples
1268 ///
1269 /// ```no_run
1270 /// use tokio::net::TcpStream;
1271 ///
1272 /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
1273 /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
1274 ///
1275 /// println!("{:?}", stream.linger()?);
1276 /// # Ok(())
1277 /// # }
1278 /// ```
1279 pub fn linger(&self) -> io::Result<Option<Duration>> {
1280 socket2::SockRef::from(self).linger()
1281 }
1282
1283 /// Sets the linger duration of this socket by setting the `SO_LINGER` option.
1284 ///
1285 /// This option controls the action taken when a stream has unsent messages and the stream is
1286 /// closed. If `SO_LINGER` is set, the system shall block the process until it can transmit the
1287 /// data or until the time expires.
1288 ///
1289 /// If `SO_LINGER` is not specified, and the stream is closed, the system handles the call in a
1290 /// way that allows the process to continue as quickly as possible.
1291 ///
1292 /// This option is deprecated because setting `SO_LINGER` on a socket used with Tokio is
1293 /// always incorrect as it leads to blocking the thread when the socket is closed. For more
1294 /// details, please see:
1295 ///
1296 /// > Volumes of communications have been devoted to the intricacies of `SO_LINGER` versus
1297 /// > non-blocking (`O_NONBLOCK`) sockets. From what I can tell, the final word is: don't
1298 /// > do it. Rely on the `shutdown()`-followed-by-`read()`-eof technique instead.
1299 /// >
1300 /// > From [The ultimate `SO_LINGER` page, or: why is my tcp not reliable](https://blog.netherlabs.nl/articles/2009/01/18/the-ultimate-so_linger-page-or-why-is-my-tcp-not-reliable)
1301 ///
1302 /// Although this method is deprecated, it will not be removed from Tokio.
1303 ///
1304 /// Note that the special case of setting `SO_LINGER` to zero does not lead to blocking.
1305 /// Tokio provides [`set_zero_linger`](Self::set_zero_linger) for this purpose.
1306 ///
1307 /// # Examples
1308 ///
1309 /// ```no_run
1310 /// # #![allow(deprecated)]
1311 /// use tokio::net::TcpStream;
1312 ///
1313 /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
1314 /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
1315 ///
1316 /// stream.set_linger(None)?;
1317 /// # Ok(())
1318 /// # }
1319 /// ```
1320 #[deprecated = "`SO_LINGER` causes the socket to block the thread on drop"]
1321 pub fn set_linger(&self, dur: Option<Duration>) -> io::Result<()> {
1322 socket2::SockRef::from(self).set_linger(dur)
1323 }
1324
1325 /// Sets a linger duration of zero on this socket by setting the `SO_LINGER` option.
1326 ///
1327 /// This causes the connection to be forcefully aborted ("abortive close") when the socket
1328 /// is dropped or closed. Instead of the normal TCP shutdown handshake (`FIN`/`ACK`), a TCP
1329 /// `RST` (reset) segment is sent to the peer, and the socket immediately discards any
1330 /// unsent data residing in the socket send buffer. This prevents the socket from entering
1331 /// the `TIME_WAIT` state after closing it.
1332 ///
1333 /// This is a destructive action. Any data currently buffered by the OS but not yet
1334 /// transmitted will be lost. The peer will likely receive a "Connection Reset" error
1335 /// rather than a clean end-of-stream.
1336 ///
1337 /// See the documentation for [`set_linger`](Self::set_linger) for additional details on
1338 /// how `SO_LINGER` works.
1339 ///
1340 /// # Examples
1341 ///
1342 /// ```no_run
1343 /// use std::time::Duration;
1344 /// use tokio::net::TcpStream;
1345 ///
1346 /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
1347 /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
1348 ///
1349 /// stream.set_zero_linger()?;
1350 /// assert_eq!(stream.linger()?, Some(Duration::ZERO));
1351 /// # Ok(())
1352 /// # }
1353 /// ```
1354 pub fn set_zero_linger(&self) -> io::Result<()> {
1355 socket2::SockRef::from(self).set_linger(Some(Duration::ZERO))
1356 }
1357 }
1358
1359 /// Gets the value of the `IP_TTL` option for this socket.
1360 ///
1361 /// For more information about this option, see [`set_ttl`].
1362 ///
1363 /// [`set_ttl`]: TcpStream::set_ttl
1364 ///
1365 /// # Examples
1366 ///
1367 /// ```no_run
1368 /// use tokio::net::TcpStream;
1369 ///
1370 /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
1371 /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
1372 ///
1373 /// println!("{:?}", stream.ttl()?);
1374 /// # Ok(())
1375 /// # }
1376 /// ```
1377 pub fn ttl(&self) -> io::Result<u32> {
1378 self.io.ttl()
1379 }
1380
1381 /// Sets the value for the `IP_TTL` option on this socket.
1382 ///
1383 /// This value sets the time-to-live field that is used in every packet sent
1384 /// from this socket.
1385 ///
1386 /// # Examples
1387 ///
1388 /// ```no_run
1389 /// use tokio::net::TcpStream;
1390 ///
1391 /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
1392 /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
1393 ///
1394 /// stream.set_ttl(123)?;
1395 /// # Ok(())
1396 /// # }
1397 /// ```
1398 pub fn set_ttl(&self, ttl: u32) -> io::Result<()> {
1399 self.io.set_ttl(ttl)
1400 }
1401
1402 // These lifetime markers also appear in the generated documentation, and make
1403 // it more clear that this is a *borrowed* split.
1404 #[allow(clippy::needless_lifetimes)]
1405 /// Splits a `TcpStream` into a read half and a write half, which can be used
1406 /// to read and write the stream concurrently.
1407 ///
1408 /// This method is more efficient than [`into_split`], but the halves cannot be
1409 /// moved into independently spawned tasks.
1410 ///
1411 /// [`into_split`]: TcpStream::into_split()
1412 pub fn split<'a>(&'a mut self) -> (ReadHalf<'a>, WriteHalf<'a>) {
1413 split(self)
1414 }
1415
1416 /// Splits a `TcpStream` into a read half and a write half, which can be used
1417 /// to read and write the stream concurrently.
1418 ///
1419 /// Unlike [`split`], the owned halves can be moved to separate tasks, however
1420 /// this comes at the cost of a heap allocation.
1421 ///
1422 /// **Note:** Dropping the write half will shut down the write half of the TCP
1423 /// stream. This is equivalent to calling [`shutdown()`] on the `TcpStream`.
1424 ///
1425 /// [`split`]: TcpStream::split()
1426 /// [`shutdown()`]: fn@crate::io::AsyncWriteExt::shutdown
1427 pub fn into_split(self) -> (OwnedReadHalf, OwnedWriteHalf) {
1428 split_owned(self)
1429 }
1430
1431 // == Poll IO functions that takes `&self` ==
1432 //
1433 // To read or write without mutable access to the `TcpStream`, combine the
1434 // `poll_read_ready` or `poll_write_ready` methods with the `try_read` or
1435 // `try_write` methods.
1436
1437 pub(crate) fn poll_read_priv(
1438 &self,
1439 cx: &mut Context<'_>,
1440 buf: &mut ReadBuf<'_>,
1441 ) -> Poll<io::Result<()>> {
1442 // Safety: `TcpStream::read` correctly handles reads into uninitialized memory
1443 unsafe { self.io.poll_read(cx, buf) }
1444 }
1445
1446 pub(super) fn poll_write_priv(
1447 &self,
1448 cx: &mut Context<'_>,
1449 buf: &[u8],
1450 ) -> Poll<io::Result<usize>> {
1451 self.io.poll_write(cx, buf)
1452 }
1453
1454 pub(super) fn poll_write_vectored_priv(
1455 &self,
1456 cx: &mut Context<'_>,
1457 bufs: &[io::IoSlice<'_>],
1458 ) -> Poll<io::Result<usize>> {
1459 self.io.poll_write_vectored(cx, bufs)
1460 }
1461}
1462
1463impl TryFrom<std::net::TcpStream> for TcpStream {
1464 type Error = io::Error;
1465
1466 /// Consumes stream, returning the tokio I/O object.
1467 ///
1468 /// This is equivalent to
1469 /// [`TcpStream::from_std(stream)`](TcpStream::from_std).
1470 fn try_from(stream: std::net::TcpStream) -> Result<Self, Self::Error> {
1471 Self::from_std(stream)
1472 }
1473}
1474
1475// ===== impl Read / Write =====
1476
1477impl AsyncRead for TcpStream {
1478 fn poll_read(
1479 self: Pin<&mut Self>,
1480 cx: &mut Context<'_>,
1481 buf: &mut ReadBuf<'_>,
1482 ) -> Poll<io::Result<()>> {
1483 self.poll_read_priv(cx, buf)
1484 }
1485}
1486
1487impl AsyncWrite for TcpStream {
1488 fn poll_write(
1489 self: Pin<&mut Self>,
1490 cx: &mut Context<'_>,
1491 buf: &[u8],
1492 ) -> Poll<io::Result<usize>> {
1493 self.poll_write_priv(cx, buf)
1494 }
1495
1496 fn poll_write_vectored(
1497 self: Pin<&mut Self>,
1498 cx: &mut Context<'_>,
1499 bufs: &[io::IoSlice<'_>],
1500 ) -> Poll<io::Result<usize>> {
1501 self.poll_write_vectored_priv(cx, bufs)
1502 }
1503
1504 fn is_write_vectored(&self) -> bool {
1505 true
1506 }
1507
1508 #[inline]
1509 fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
1510 // tcp flush is a no-op
1511 Poll::Ready(Ok(()))
1512 }
1513
1514 fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
1515 self.shutdown_std(std::net::Shutdown::Write)?;
1516 Poll::Ready(Ok(()))
1517 }
1518}
1519
1520impl fmt::Debug for TcpStream {
1521 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1522 // skip PollEvented noise
1523 (*self.io).fmt(f)
1524 }
1525}
1526
1527impl AsRef<Self> for TcpStream {
1528 fn as_ref(&self) -> &Self {
1529 self
1530 }
1531}
1532
1533#[cfg(unix)]
1534mod sys {
1535 use super::TcpStream;
1536 use std::os::unix::prelude::*;
1537
1538 impl AsRawFd for TcpStream {
1539 fn as_raw_fd(&self) -> RawFd {
1540 self.io.as_raw_fd()
1541 }
1542 }
1543
1544 impl AsFd for TcpStream {
1545 fn as_fd(&self) -> BorrowedFd<'_> {
1546 unsafe { BorrowedFd::borrow_raw(self.as_raw_fd()) }
1547 }
1548 }
1549}
1550
1551cfg_windows! {
1552 use crate::os::windows::io::{AsRawSocket, RawSocket, AsSocket, BorrowedSocket};
1553
1554 impl AsRawSocket for TcpStream {
1555 fn as_raw_socket(&self) -> RawSocket {
1556 self.io.as_raw_socket()
1557 }
1558 }
1559
1560 impl AsSocket for TcpStream {
1561 fn as_socket(&self) -> BorrowedSocket<'_> {
1562 unsafe { BorrowedSocket::borrow_raw(self.as_raw_socket()) }
1563 }
1564 }
1565}
1566
1567#[cfg(all(tokio_unstable, target_os = "wasi"))]
1568mod sys {
1569 use super::TcpStream;
1570 use std::os::fd::{AsFd, AsRawFd, BorrowedFd, RawFd};
1571
1572 impl AsRawFd for TcpStream {
1573 fn as_raw_fd(&self) -> RawFd {
1574 self.io.as_raw_fd()
1575 }
1576 }
1577
1578 impl AsFd for TcpStream {
1579 fn as_fd(&self) -> BorrowedFd<'_> {
1580 unsafe { BorrowedFd::borrow_raw(self.as_raw_fd()) }
1581 }
1582 }
1583}