Howtojs

Применение асинхронных генераторов

23 февраля 2021 г. • ☕️ 7 мин.

Использование асинхронных итераторов и генераторов довольно редко встречается на практике в коде, хотя они могут упростить код во многих ситуациях.

Async Iterators/Generators

С приходом ECMAScript 6 появились итераторы и генераторы, позволяющие удобно перебирать данные синхронно.

Асинхронные итераторы и генераторы - следующий шаг, они помогают удобно перебирать данные, поступающие асинхронно.

Синтаксис

Для объявления асинхронного генератора, нужно добавить ключевое слово async и специальный символ звездочки *.

В теле генератора, мы можем использовать ключевое слово await для работы с асинхронным кодом, а также yield для отдачи одного из значений наружу тому, кто эти значения потребляет (перебирает).

async function* generator() {  yield 1
  yield 2
  await new Promise(resolve => setTimeout(resolve, 500))
  yield 3
  yield 4
}

Для получения (перебора) значений из асинхронного генератора или итератора, используется новый синтаксис цикла for-await-of. Внутри такого цикла, так же можно использовать ключевое слово await.

for await (const value of generator()) {  await new Promise(resolve => setTimeout(resolve, 100))
  console.log(value)
  await new Promise(resolve => setTimeout(resolve, 100))
}

Применение

Теперь посмотрим, где это может быть удобно в реальных ситуациях.

Чтение большого файла по частям

Классический способ с использованием потоков (streams).

fs.createReadStream(__filename, { highWaterMark: 32 })  .on("data", chunk => {
    console.log(chunk)
  })
  .on("end", () => console.log("end"))

Такой код не умеет работать с async/await. Даже если мы напишем callback для события data, как async функцию, следующее событие data, может произойти раньше, чем завершиться обработка текущего.

Такое поведение не позволяет легко выстраивать обработку большого файла последовательно. Поэтому приходилось использовать ручную остановку потока (stream), вспомогалтельные функции .pipe для построение pipeline либо сторонние библиотеки, что в целом могло значительно усложнить код для простой на первый взгляд задачи.

Кроме того, необходимо было уделять особое внимание обработке ошибок с использованием события error.

В node.js уже есть поддержка асинхронной итерации в модулях stream и fs. Поэтому мы можем переписать код таким образом:

for await (const value of fs.createReadStream(__filename)) {  console.log(value)
}
console.log("end")

Такой код проще читать. Из асинхронного итератора можно потреблять и асинхронно обрабатывать значения последовательно, все остальное сделанно за нас.

Подсчет количества обработанных значений (декорирование)

Несколько генераторов можно комбинировать в pipeline. Получается декорирование для генераторов.

async function* withCounter(stream) {
  let count = 0

  for await (const value of stream) {
    count = count + 1
    yield { count, value }
  }
}

const stream = fs.createReadStream(__filename)

const reader = withCounter(stream)
for await (const { value, count } of reader) {
  console.log(value)
  console.log(count)
}

Чтение файла по логическим блокам

Другой пример построения pipeline, когда мы хотим прочитать файл, разбив его на части не по длине в байтах, а по символу начала новой строки \n, либо любому другому логическому разделителю блока.

const createReader = require("chunk-reader")
const fs = require("fs")

const stream = fs.createReadStream(__filename)

const reader = createReader(stream, { delimiter: "\n" })

for await (const line of reader) {
  console.log(line)
}

Исходный код готового модуля chunk-reader можно посмотреть здесь.

Перебор сущностей при пагинации API

Постов может быть очень много, что не позволит загрузить их за раз.

async function* getBlogPosts() {
  let page = 1

  while (true) {
    const page = await fetch(`https://howtojs.ru/?page=${page}`)

    if (!page.hasNext) break

    for (const post of page.posts) {
      yield post
    }
  }
}

for await (const post of getBlogPosts()) {
  console.log(post)
}

Удобная абстракция, не правда ли?

Обработка запросов на сервере

Пример взят из Deno

import { serve } from "https://deno.land/std@0.88.0/http/server.ts";

const s = serve({ port: 8000 });

for await (const req of s) {
  req.respond({ body: "Hello World\n" });
}

Перебор записей из базы данных с помощью итерации курсора

const users = knex.select('*').from('users').stream();

for await (const user of users) {
  console.log(user)
}

Здесь мы перебираем все записи из БД по одной, не загружая их разом в память, что полезно при работе с очень крупными таблицами/коллекциями.

Другие идеи

  • обработка событий (event) из websocket
  • получение данных из TCP Socket
  • работа с очередями задач (Rabbitmq, AWS SQS)
  • рекурсивное чтение директорий
  • парсинг и обработка различных файлов

Заключение

Обработка данных с использованием асинхронных итераторов/генераторов, во многом заменяет потоки streams и event emitter, как когда-то Promise и async/await пришли на замену callbacks. Асинхронный код, начинает выглядеть, как синхронный, не создавая при этом блокировок событийного цикла event loop.

Если вы видете, что данные с которыми предстоит работать, слишком большие, либо представляют собой бесконечный поток, стоит задуматься, а не использовать ли здесь асинхронный генератор?