В этой статье я дам базовый обзор создания слоя промежуточного программного обеспечения с использованием Tower и Axum. Поскольку я новичок в этом, я постараюсь сохранить информацию на высоком уровне, включив в нее некоторые ошибки, с которыми я столкнулся на этом пути.
Ограничитель скорости
Очень быстро ограничитель скорости, который я выбрал для создания, представляет собой ограничитель скорости корзины токенов. Представьте себе ведро, которое пополняет жетоны каждые 5 секунд. Каждый входящий запрос будет потреблять 1 токен. Если в корзине больше нет токенов, промежуточное программное обеспечение вернет клиенту HTTP 429.
Чтобы все было проще, пополнение токена будет рассчитано при поступлении запроса клиента. Отслеживая время последнего обновления, служба может рассчитать, пришло ли время обновлять токены.
Промежуточное программное обеспечение приложения
Tower предоставляет концепцию слоев, абстракцию, которая позволяет приложению инкапсулировать сервисы в повторно используемые компоненты.
Сначала давайте создадим сервисный уровень для обработки входящих запросов. Инициализируйте ограничение скорости для корзины токенов, объявив количество запросов для приема и продолжительность, после которой токены будут обновлены.
Когда мы реализуем свойство layer
, обратите внимание, как каждый слой содержит базовый сервис. Метод layer()
также передается в сервисе. Эта служба становится обернутой службой текущего уровня. Таким образом, запрос должен быть обработан текущим уровнем, прежде чем обращаться к внутренней службе. В этом примере мы создадим и будем использовать пользовательский сервис.
pub struct RateLimitLayer {
rate: Rate,
}
impl RateLimitLayer {
/// Create new rate limit layer.
pub fn new(num: usize, per: Duration) -> Self {
let rate = Rate::new(num, per);
RateLimitLayer { rate }
}
}
impl<S> Layer<S> for RateLimitLayer {
type Service = RateLimit<S>;
fn layer(&self, service: S) -> Self::Service {
RateLimit::new(service, self.rate)
}
}
Сервис
Структура RateLimit
инкапсулирует внутреннюю службу, которую она будет защищать. Остальные атрибуты присутствуют для реализации ограничителя скорости.
#[derive(Debug)]
pub struct RateLimit<T> {
inner: T,
rate: Rate,
last_token_refresh: Arc<Mutex<Instant>>,
tokens: Arc<Mutex<usize>>,
permit_semaphore: PollSemaphore,
permit: Option<OwnedSemaphorePermit>,
}
Следует отметить, что несколько переменных помечены как Arc<Mutex>
. Это необходимо, поскольку службы должны реализовывать трейт клонирования. Tower / Axum создает несколько сервисов в нашем промежуточном программном обеспечении, поэтому нам нужно защитить токены и рефреш токены, чтобы они не были изменены одновременно.
Поскольку структура содержит OwnedSemaphorePermit
, мы не можем получить Clone. Вместо этого, при клонировании сервиса, мы создаем новый сервис с тем же семафором, но с разрешением в не распределенном состоянии.
impl<T: Clone> Clone for RateLimit<T> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
rate: self.rate.clone(),
last_token_refresh: self.last_token_refresh.clone(),
tokens: self.tokens.clone(),
permit_semaphore: self.permit_semaphore.clone(),
permit: None,
}
}
}
Теперь мы можем реализовать служебный трейт. Следует отметить, что наш тип ошибки ограничен BoxError
. Это сделано намеренно. Если мы определим ошибки на N уровнях глубоко в нашем сервисе и захотим получить к нему доступ, у нас будет неприятная вложенная строка Service1.Service2.Service3.Error
. Вместо этого, помещая все наши ошибки в BoxError
, мы сможем напрямую ссылаться на ошибку.
Как и во всех службах, существует функция poll_ready
, которая возвращает Poll::ready
, если служба готова обработать запрос, или Poll::pending
, если службе все еще требуется некоторое время для обработки запроса. Обратите внимание, что эта служба вместе с внутренней службой должна быть готова до того, как будет восстановлен Poll::ready
.
Функция call
выполнит некоторую логику перед вызовом внутренней службы и возвратом результата.
impl<S, Request> Service<Request> for RateLimit<S>
where
S: Service<Request>,
S::Error: Into<BoxError>,
{
type Response = S::Response;
type Error = BoxError;
type Future = ResponseFuture<S::Future>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), BoxError>> {
...
}
fn call(&mut self, request: Request) -> Self::Future {
...
}
}
poll_ready() и call()
В poll_ready()
мы сначала проверяем, есть ли у нас уже разрешение. Если это так, мы можем просто проверить, готова ли внутренняя служба, а затем вернуться. С другой стороны, если у нас нет разрешения, то нам нужно посмотреть, сможем ли мы его приобрести.
Сначала мы попробуем обновить токены, если сможем. Далее, если токенов достаточно, мы можем получить разрешение от семафора. Независимо от того, готова внутренняя служба или нет, теперь у нас есть разрешение. Это может вызвать беспокойство, если клиент удерживает разрешение в течение длительного периода времени, фактически не вызывая и не освобождая разрешение, поэтому имейте это в виду при настройке количества семафоров.
Если токенов нет, то мы отправляем ошибку внутри Poll::Ready
. Возможно, вам интересно, почему бы не вернуть Poll::Pending
? Что ж, оказывается, что состояние ожидания заставит клиента активно ждать, пока служба не будет готова. Чтобы вернуть ошибку, мы должны передать ошибку в состоянии Poll::Ready
. Ошибка перехвачена и обработана в основном вызове Axum в main.rs
, где клиенту возвращается соответствующий ответ и код состояния.
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), BoxError>> {
if self.permit.is_none() {
// Попробуйте обновить токены
self.refresh_tokens();
let mut tokens_lock = self.tokens.lock().unwrap();
if *tokens_lock > 0 {
self.permit = ready!(self.permit_semaphore.poll_acquire(cx));
*tokens_lock -= 1;
debug_assert!(
self.permit.is_some(),
"Семафор RateLimit никогда не закрывается, поэтому poll_acquire \
никогда не должен подвести",
);
} else {
println!("Poll_ready, без токенов");
return Poll::Ready(Err(Box::new(RateLimitError(()))));
}
}
// После того, как мы получили разрешение (или если оно у нас уже было),
// опросите внутреннюю службу.
self.inner.poll_ready(cx).map_err(Into::into)
}
В методе thecall()
мы пытаемся использовать разрешение, а затем вызываем внутреннюю службу. Мы могли бы напрямую вернуть Future
, или мы могли бы выполнить некоторую дальнейшую обработку с ответом. Для практики я выбрал последнее и создал ResponseFuture
.
fn call(&mut self, request: Request) -> Self::Future {
// Возьмите разрешение
let permit = self
.permit
.take()
.expect("максимальное количество запросов; poll_ready должны быть вызваны в первую очередь");
// Вызовите внутреннюю службу
let future = self.inner.call(request);
ResponseFuture {
response_future: future,
_permit: permit,
}
}
ResponseFuture
Хотя я мог бы использовать свой простой ResponseFuture
, было бы полезнее увидеть более реалистичный пример. Следующий код взят из примера TimeoutLayer
в Tower (исходный код).
Что касается тайм-аута, мы отслеживаем, как долго код выполнялся с помощью объекта Sleep tokio
. Обратите внимание, что мы используем #pin
для обоих объектов, потому что хотим предотвратить перемещение объектов во время их обработки.
pin_project! {
/// [Timeout] response future
///
/// [Timeout]: crate::timeout::Timeout
#[derive(Debug)]
pub struct ResponseFuture<T> {
#[pin]
response: T,
#[pin]
sleep: Sleep,
}
}
impl<T> ResponseFuture<T> {
pub(crate) fn new(response: T, sleep: Sleep) -> Self {
ResponseFuture { response, sleep }
}
}
В будущей реализации обратите внимание, как мы можем выполнять дополнительные проверки в ожидании ответа. Если ответ готов, мы можем немедленно вернуть результат. Однако, если он не готов, мы проверяем режим ожидания, чтобы узнать, истек ли тайм-аут.
Кроме того, создав наш собственный ResponseFuture
, мы можем отделить некоторую логику от сервисного файла для упрощения обслуживания.
impl<F, T, E> Future for ResponseFuture<F>
where
F: Future<Output = Result<T, E>>,
E: Into<crate::BoxError>,
{
type Output = Result<T, crate::BoxError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
// Во-первых, попробуйте опросить future
match this.response.poll(cx) {
Poll::Ready(v) => return Poll::Ready(v.map_err(Into::into)),
Poll::Pending => {}
}
// Теперь проверьте спящий режим
match this.sleep.poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(_) => Poll::Ready(Err(Elapsed(()).into())),
}
}
}
Вывод
Создание слоев в Tower может быть сложной задачей при работе с типами и трейтами. Однако после некоторой практики становится легче выявлять закономерности и применять эти знания к своим собственным проблемам.