В этой статье мы собираемся изучить
futures::stream::FuturesUnordered
, мощный и эффективный инструмент для одновременного выполнения нескольких асинхронных задач. Эта утилита позволяет нам опрашивать несколько фьючерсов неблокирующим образом, автоматически обрабатывая завершение отдельных задач и выдавая их результаты по мере их поступления.
Давайте создадим несколько асинхронных функций, которые имитируют различные задачи, и используем FuturesUnordered
для их одновременного выполнения. Сначала добавьте необходимые зависимости в ваш файл Cargo.toml:
[dependencies]
tokio = { version = "1.0", features = ["full"] }
futures = "0.3"
Далее создайте асинхронные функции:
use std::time::Duration;
use tokio::time::sleep;
// Define the async functions that return a String.
async fn task_one() -> String {
sleep(Duration::from_secs(3)).await;
"Задача 1 выполнена".to_owned()
}
async fn task_two() -> String {
sleep(Duration::from_secs(1)).await;
"Задача 2 выполнена".to_owned()
}
async fn task_three() -> String {
sleep(Duration::from_secs(2)).await;
"Задача 3 выполнена".to_owned()
}
Мы определили три асинхронные функции task_one
, task_two
и task_three
, каждая из которых имитирует задачу с различной длительностью, используя tokio::time::sleep
. Теперь мы будем использовать FuturesUnordered
для одновременного выполнения этих задач:
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use std::pin::Pin;
use std::future::Future;
#[tokio::main]
async fn main() {
// Создайте FuturesUnordered с одинаковым типом для всех фьючерсов: Box<dyn Future<Output = String>>.
let mut tasks = FuturesUnordered::<Pin<Box<dyn Future<Output = String>>>>::new();
// Используйте Box::pin для создания не перемещаемой, выделенной кучей памяти для версии каждого future.
// Это гарантирует, что местоположение памяти future не изменится, как только оно начнет выполняться.
tasks.push(Box::pin(task_one()));
tasks.push(Box::pin(task_two()));
tasks.push(Box::pin(task_three()));
while let Some(result) = tasks.next().await {
println!("{}", result);
}
}
Когда вы запустите этот пример, вы увидите, что задачи выполняются одновременно, и их результаты печатаются по мере их завершения. Выходные данные могут отличаться по порядку, поскольку это зависит от времени выполнения каждой задачи:
Задача 2 выполнена
Задача 3 выполнена
Задача 1 выполнена
В этом примере мы определили три асинхронные функции task_one
, task_two
и task_three
, каждая из которых имитирует задачу с различной длительностью, используя tokio::time::sleep
. Мы создали FuturesUnordered
экземпляр под названием tasks
и поместили в него наши три задачи.
Как это работает?
Теория, лежащая в основе futures::stream::FuturesUnordered
, заключается в его реализации, которая сочетает в себе эффективное управление задачами и асинхронную модель программирования Rust. Вот обзор того, как это работает под капотом:
FuturesUnordered
- это структура данных, которая содержит набор фьючерсов. Внутренне он поддерживает два связанных списка фьючерсов — один для фьючерсов, выполненых, и другой для фьючерсов, которые еще не выполнены.- Когда вы добавляете фьючерс в
FuturesUnordered
с помощьюpush()
, оно изначально помещается в список не выполненых фьючерсов. Затем фьючерс регистрируется в асинхронной среде выполнения, чтобы получать уведомления, когда оно будет готово к выполнению прогресса (т.е. когда оно будет выполнено). - Когда фьючерс становится выполненым, он перемещается из списка “выполняемые” в список “выполненые”. Это происходит в результате того, что асинхронная среда выполнения уведомляет
waker
, связанный с фьючерсом. - Структура
FuturesUnordered
сама реализует трейтStream
. Когда вы вызываетеnext()
в экземпляреFuturesUnordered
, он опрашивает фьючерсы в списке выполненых. - Вызов
next().await
преобразуется в завершенный фьючерс из списка выполненых, когда таковое будет возможно. Если выполненных фьючерсов нет, он будет асинхронно ждать, пока фьючерс не станет выполненым. - Когда фьючерс выполнен, он удаляется из списка выполняемых, а результат возвращается next().await. Если фьючерс не завершен, он остается в списке выполняемых, и тем временем может выполняться другой фьючерс. Этот процесс гарантирует, что
FuturesUnordered
эффективно обрабатывает несколько фьючерсов одновременно, используя доступные ресурсы и избегая блокировки. - Если фьючерс отменён или отброшен, он будет удаляется из соответствующего списка (выполненых или выполняемых), гарантируя, что все ресурсы, связанные с фьючерсом, будут освобождены.
- Поток
FuturesUnordered
продолжает возвращать значения до тех пор, пока в списках выполненых или выполняемых есть фьючерсы. Как только все фьючерсы будут завершены или отменены, потокFuturesUnordered
вернет None, указывая, что поток завершен.
Итог
FuturesUnordered
- это жизнь асинхронной вечеринки. Это гарантирует, что ни один фьючерс не останется без внимания, и каждый фьючерс получит свой момент времени в центре внимания, когда придет подходящее время.
Мы видели, как FuturesUnordered
ведет два списка: один для фьючерсов выполненых и другой для тех, кто все еще ждет своего часа.
Используя возможности FuturesUnordered
, приложения Rust могут повысить производительность, эффективно управлять параллелизмом и максимально эффективно использовать ресурсы. Важно признать важность этого замечательного инструмента и признать, что асинхронное программирование в Rust может быть как эффективным, так и полезным.
Счастливого кодирования!