Перейти к содержанию

FuturesUnordered - Эффективный способ управления несколькими фьючерсами в Rust

Posted on:6 апреля 2023 г. at 10:18

Example Dynamic OG Image link В этой статье мы собираемся изучить futures::stream::FuturesUnordered, мощный и эффективный инструмент для одновременного выполнения нескольких асинхронных задач. Эта утилита позволяет нам опрашивать несколько фьючерсов неблокирующим образом, автоматически обрабатывая завершение отдельных задач и выдавая их результаты по мере их поступления.

Давайте создадим несколько асинхронных функций, которые имитируют различные задачи, и используем FuturesUnordered для их одновременного выполнения. Сначала добавьте необходимые зависимости в ваш файл Cargo.toml:

[dependencies]
tokio = { version = "1.0", features = ["full"] }
futures = "0.3"

Далее создайте асинхронные функции:

use std::time::Duration;
use tokio::time::sleep;

// Define the async functions that return a String.
async fn task_one() -> String {
    sleep(Duration::from_secs(3)).await;
    "Задача 1 выполнена".to_owned()
}
async fn task_two() -> String {
    sleep(Duration::from_secs(1)).await;
    "Задача 2 выполнена".to_owned()
}
async fn task_three() -> String {
    sleep(Duration::from_secs(2)).await;
    "Задача 3 выполнена".to_owned()
}

Мы определили три асинхронные функции task_one, task_two и task_three, каждая из которых имитирует задачу с различной длительностью, используя tokio::time::sleep. Теперь мы будем использовать FuturesUnordered для одновременного выполнения этих задач:

use futures::stream::FuturesUnordered;
use futures::StreamExt;
use std::pin::Pin;
use std::future::Future;

#[tokio::main]
async fn main() {
    // Создайте FuturesUnordered с одинаковым типом для всех фьючерсов: Box<dyn Future<Output = String>>.
    let mut tasks = FuturesUnordered::<Pin<Box<dyn Future<Output = String>>>>::new();
    // Используйте Box::pin для создания не перемещаемой, выделенной кучей памяти для версии каждого future.
    // Это гарантирует, что местоположение памяти future не изменится, как только оно начнет выполняться.
    tasks.push(Box::pin(task_one()));
    tasks.push(Box::pin(task_two()));
    tasks.push(Box::pin(task_three()));
    while let Some(result) = tasks.next().await {
        println!("{}", result);
    }
}

Когда вы запустите этот пример, вы увидите, что задачи выполняются одновременно, и их результаты печатаются по мере их завершения. Выходные данные могут отличаться по порядку, поскольку это зависит от времени выполнения каждой задачи:

Задача 2 выполнена
Задача 3 выполнена
Задача 1 выполнена

В этом примере мы определили три асинхронные функции task_one, task_two и task_three, каждая из которых имитирует задачу с различной длительностью, используя tokio::time::sleep. Мы создали FuturesUnordered экземпляр под названием tasks и поместили в него наши три задачи.

Как это работает?

Теория, лежащая в основе futures::stream::FuturesUnordered, заключается в его реализации, которая сочетает в себе эффективное управление задачами и асинхронную модель программирования Rust. Вот обзор того, как это работает под капотом:

  1. FuturesUnordered - это структура данных, которая содержит набор фьючерсов. Внутренне он поддерживает два связанных списка фьючерсов — один для фьючерсов, выполненых, и другой для фьючерсов, которые еще не выполнены.
  2. Когда вы добавляете фьючерс в FuturesUnordered с помощью push(), оно изначально помещается в список не выполненых фьючерсов. Затем фьючерс регистрируется в асинхронной среде выполнения, чтобы получать уведомления, когда оно будет готово к выполнению прогресса (т.е. когда оно будет выполнено).
  3. Когда фьючерс становится выполненым, он перемещается из списка “выполняемые” в список “выполненые”. Это происходит в результате того, что асинхронная среда выполнения уведомляет waker, связанный с фьючерсом.
  4. Структура FuturesUnordered сама реализует трейт Stream. Когда вы вызываете next() в экземпляре FuturesUnordered, он опрашивает фьючерсы в списке выполненых.
  5. Вызов next().await преобразуется в завершенный фьючерс из списка выполненых, когда таковое будет возможно. Если выполненных фьючерсов нет, он будет асинхронно ждать, пока фьючерс не станет выполненым.
  6. Когда фьючерс выполнен, он удаляется из списка выполняемых, а результат возвращается next().await. Если фьючерс не завершен, он остается в списке выполняемых, и тем временем может выполняться другой фьючерс. Этот процесс гарантирует, что FuturesUnordered эффективно обрабатывает несколько фьючерсов одновременно, используя доступные ресурсы и избегая блокировки.
  7. Если фьючерс отменён или отброшен, он будет удаляется из соответствующего списка (выполненых или выполняемых), гарантируя, что все ресурсы, связанные с фьючерсом, будут освобождены.
  8. Поток FuturesUnordered продолжает возвращать значения до тех пор, пока в списках выполненых или выполняемых есть фьючерсы. Как только все фьючерсы будут завершены или отменены, поток FuturesUnordered вернет None, указывая, что поток завершен.

Итог

FuturesUnordered - это жизнь асинхронной вечеринки. Это гарантирует, что ни один фьючерс не останется без внимания, и каждый фьючерс получит свой момент времени в центре внимания, когда придет подходящее время.

Мы видели, как FuturesUnordered ведет два списка: один для фьючерсов выполненых и другой для тех, кто все еще ждет своего часа.

Используя возможности FuturesUnordered, приложения Rust могут повысить производительность, эффективно управлять параллелизмом и максимально эффективно использовать ресурсы. Важно признать важность этого замечательного инструмента и признать, что асинхронное программирование в Rust может быть как эффективным, так и полезным.

Счастливого кодирования!