Перейти к содержанию

Создание многопоточного HTTP-сервера в Rust

Posted on:23 июня 2023 г. at 08:42

Example Dynamic OG Image link

Введение

Ранее в статье мы узнали о том, как работает TCP и как создать HTTP-сервер в rust. В этой статье мы добавим многопоточность в приложение.

Создание медленного маршрута

Давайте создадим медленный маршрут, чтобы смоделировать опыт поведения нашего сервера к запросу, который занимает много времени. Чтобы начать, измените блок if в функции handle_connection, чтобы использовать еще один маршрут, /slow, в котором мы будем усыплять наш основной поток в течение 5 секунд.

let (status_line, filename) = match &req_line[..] {
    "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "response.html"),
    "GET /slow HTTP/1.1" => {
        thread::sleep(Duration::from_secs(5));
        ("HTTP/1.1 200 OK", "response.html")
    }
    _ => ("HTTP/1.1 200 OK", "404.html"),
};

Здесь происходят две интересные вещи, во-первых, match оценивает как выражение, т.е. вы можете присвоить его переменной, и вычисленное значение будет передано.

Во-вторых, аргумент, переданный для соответствия &req _ line[..], очень интересен. Мы знаем & является инструкцией заимствования, но [..] является полным диапазоном среза, то есть [] означает создание slice(&str) из нашей строки req_line и.. означает сделать это для всего диапазона, что гораздо лучше, чем запись 0..(req_line.len()), что означает то же самое.

Простейшая многопоточность

Самый простой способ многопоточности - создать новый поток для каждого запроса, добавив его в итерацию прослушивателя потока.

thread::spawn(|| {
    handle_connection(stream);
});

Это просто и работает, но есть проблема, если большое количество запросов делается за короткий промежуток времени это может в конечном итоге породить большое количество потоков и переполнить систему.

Так что же нужно сделать, чтобы справиться с этой проблемой? Мы можем справиться с этой ситуацией, ограничив максимальное количество потоков, которые могут существовать в любой момент.

Давайте создадим пул потоков lib

Создайте файл под названием lib.rs, мы можем добавить весь код в main.rs, но стандартной практикой является создание крейта. Для создания файла необходимо присвоить ему имя lib.rs и cargo не будет генерировать двоичный файл каждый раз для него, имеющийся в нем код может повторно использоваться другими файлами.

Заметка - В реальном приложении код может закончиться вызовом других API или обработкой файловой системы, это может облегчить обработку ошибок, используя не блокирующую IO среду выполнения, тогда лучшим выбором будет Tokio.

Начните с добавления следующих строк в файл:

pub struct ThreadPool;

impl ThreadPool {
  pub fn new(size: usize) -> ThreadPool {
      ThreadPool
  }
  pub fn execute<F>(&self, f: F)
   where
    F: FnOnce() + Send + 'static,
    {
    }
}
fn main() {
  // фрагмент кода
  let pool = ThreadPool::new(4);

  // фрагмент кода

  for stream in listener.incoming() {
    // фрагмент кода
    pool.execute(|| {
        handle_connection(stream);
    });
  }
}

Здесь struct ThreadPool является пустой структурой, т.е. не имеет параметров. Ниже две функции new и execute, new - создать новый экземпляр нашей структуры, а execute - выполнить его.

&self должно отдавать ссылку (называется заимствованием) на текущий экземпляр, и f: F является дженериком, слово where помогает определить тип, связанный с дженериком F (это также может быть в той же строке). Это означает, что F может быть только FnOnce(), что означает однократно вызываемую функцию без возврата, Send тип значения, который будет безопасен для передачи в другой поток, 'static означает, что он будет иметь статическое время жизни, и, следовательно, любые значения, передаваемые ему, должны быть либо со статическим временем жизни, либо принадлежать данным, а не заимствовать.

Внедрение новой функции

pub struct ThreadPool {
    threads: Vec<thread::JoinHandle<()>>,
}

impl ThreadPool {
  pub fn new(size: usize) -> ThreadPool {
      assert!(size > 0);

      let mut threads = Vec::with_capacity(size);

      for _ in 0..size {
          // создать некоторые потоки и сохранить их в векторе
      }

      ThreadPool { threads }
  }

  // фрагмент кода
}

То, что происходит здесь довольно просто, мы добавили именованные переменные потоки в пуле потоков структуры, с надеждой на сохранение вектора потоков. Поэтому мы используем тот же тип, что и stdlib::thread::spawn, но наша программа будет иметь проблему, то есть thread::spawn не может делегировать задачу будущему времени, но мы получим фактическую задачу в функции execute, которая придет после new функции. Так что мы можем сделать? Давайте внедрим Worker для наших потоков.

Создание структуры Worker

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize) -> Worker {
        let thread = thread::spawn(|| {});

        Worker { id, thread }
    }
}

Также измените ThreadPool, чтобы он был вектором Worker.

pub struct ThreadPool {
    threads: Vec<Worker>,
}

Несмотря на то, что мы изолировали thread::spawn, порожденный воркерами вне структуры ThreadPool, мы все еще не решили проблему делегирования задач в будущее. Это происходит потому, что мы не полностью внедрили структуру Worker.

Добавление каналов

Наши воркеры нуждаются в структуре данных производителя, где задачи (запросы HTTP) могут быть отправлены с одного конца и выбраны в соответствии с доступностью с другого конца. Для этого сценария Rust предоставляет thread-safe канал, по которому задачи могут передаваться из одного потока (первичный поток) в другой (рабочий поток).

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

impl ThreadPool {
  // фрагмент кода
  pub fn new(size: usize) -> ThreadPool {
      assert!(size > 0);

      let (sender, receiver) = mpsc::channel();

      let mut workers = Vec::with_capacity(size);

      for id in 0..size {
          workers.push(Worker::new(id));
      }

      ThreadPool { workers, sender }
  }

  // фрагмент кода
}

type Job = Box<dyn FnOnce() + Send + 'static>;

impl Worker {
    fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker {
        let thread = thread::spawn(|| {
            receiver;
        });

        Worker { id, thread }
    }
}

Рассмотрим происходящее здесь:

  1. mpsc::channel - MPSC означает нескольких производителей, одного потребителя, функцию канала, создает буфер, в котором несколько производителей могут добавлять задачи, и один потребитель может потреблять. Эта функция возвращает кортеж отправителя и получателя, которые, как предполагает название, являются замыканиями для отправки и приема информации по каналу.
  2. Job структура - 2 новых ключевых слова для просмотра в этой структуре, первый dyn, который объявляет дженерик, чтобы иметь динамическую диспетчеризацию (я добавил примечание ниже, чтобы понять, что это), тип Box используется, чтобы дать смарт-указатель поверх любого дженерика, как std::unique_ptr<T> в C++ (но не точно).
  3. Предостережение - Мы видели mpsc::channel предоставляет только одного потребителя и нескольких производителей, но это вызовет у нас проблемы. Во-первых, это небезопасно, поэтому нам нужен lock или mutex, чтобы работать на этом приемнике, во-вторых, этот приемник должен совместно использоваться в нескольких потоках, следовательно, требуется общая ссылка.

На языке программирования дженерики могут обрабатываться одним из двух способов - статической или динамической. При статической отправке различные возможные типы дженерик-программы выводятся во время компиляции и имеют отдельные блоки кода сборки, связанные с каждым типом. Это может сократить время выполнения, и является поведением по умолчанию, но с проблемой того, что должно произойти, если все типы не могут быть определены или мы не хотим создавать отдельные кодовые блоки для каждого дженерика. Здесь происходит динамическая отправка, что означает, что тип дженерик будет передан в среду выполнения в виде закрытой коробки и будет выведен во время выполнения. Это может быть медленнее, но часто обеспечивает большую гибкость.

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
      // фрагмент кода
    }
}

также обновить в пуле потоков new

workers.push(Worker::new(id, Arc::clone(&receiver)));

Arc тип, который дает нам общую ссылку приемника, он похож на std::shared_ptr<T> в C++ (опять же, не точно).

Реализовать задание потока

let thread = thread::spawn(move || loop {
    let job = receiver.lock().unwrap().recv().unwrap();

    println!("Worker {id} получил работу; выполнение.");

    job();
});

Worker { id, thread }

добавить это в исполняемый пул потоков

self.sender.send(Box::new(f)).unwrap();

Ключевое слово move - это способ объявить перемещение выражения, т.е. здесь выражение после ключевого слова move будет принадлежать порожденному потоку. loop просто для достижения бесконечного цикла.

В заключение

Здесь мы узнали о многопоточности rust и некоторых важных концепциях, связанных с многопоточностью rust.