Потоковая обработка данных

в функциональном стиле на Go

Арсений Жижелев, Праймтолк / zhizhelev@primetalk.ru

План

  • Введение
    • задачи потоковой обработки
    • отличие от пакетной обработки ("main"-style)
    • трансдьюсеры (автоматы с выходом)
  • Примеры использования
Введение. Задачи потоковой обработки
  • чтение и обработка файла "на лету" (ndjson, csv, ...)
  • загрузка и скачивание файлов
  • обработка аудио-потока в телефонии
  • распознавание речи в поточном режиме
  • задачи с минимизацией задержки ответа (latency)
  • поток событий (обработка, фильтрация, сохранение, передача дальше)
  • веб-сервис с потоком входных запросов
  • IoT
Как решать?
  • монолитный main
  • unix-way: потоки текста
  • go-way: каналы+горутины
  • фреймворки/библиотеки потоковой обработки:
    • представление блоков
    • соединение блоков
  • например, библиотека goio:
    • блоки - пользовательские функции
    • соединение блоков - функции высшего порядка, комбинаторы

Конечный автомат с выходом (трансдьюсер)

Пример конечного автомата
  • вход/выход - потоки символов
  • ожидания зависят от состояния
  • может порождать выход спонтанно
  • конечный объём памяти (bounded)
  • композиция потоковых обработчиков
Функции высшего порядка (комбинаторы), map

map[F[_]]:: [A,B] => (A=>B) => F[A] => F[B]
  • обобщённая (generic) по входным и выходным типам данных [A,B]
  • на вход принимает функцию (A => B)
  • работает с функторами F[_] (коллекции List[_], потоки данных Stream[_], ...)
  • возвращает функцию (или является функцией), преобразующую коллекцию/поток целиком F[A] => F[B]
Функции высшего порядка (комбинаторы), flatMap (bind)

flatMap[F[_]]:: [A,B] => F[A] => (A=>F[B]) => F[B]
bind[F[_]]:: [A,B] => (A=>F[B]) => F[A] => F[B]
  • отличается от map сигнатурой пользовательской функции
  • (функтор F[_] с такой операцией называется монадой)
  • выразительная сила этого комбинатора огромна
Сопоставление с шаблоном, деконструктор

Значение F[A] может иметь несколько структурных форм, укладывающихся в шаблоны. Каждую форму можно обработать отдельно.

unapply[F[_]]:: [A,B] => F[A] => (*[A]=>B)* => B
  • (*[A]=>B)* - условное обозначение, несколько функций на все случаи, возможные в F[_]
Сопоставление с шаблоном, примеры
  • List.unapply::[A,B] => List[A] => /*cons*/((A,List[A])=>B), /*nil*/(=>B) => B - структура списка полностью разбирается парой функций - для пустого и непустого списка
  • Boolean.unapply::[B] => Boolean => /*true*/(=>B), /*false*/(=>B) => B - if - тоже является деконструктором типа Boolean
  • enum Enum = e1|e2|e3
    Enum.unapply:: Enum => /*e1*/(=>B), /*e2*/(=>B), /*e3*/(=>B) => B
Кодирование трансдьюсера функцией
  • входной алфавит - A
  • выходной алфавит - B
  • состояние - S
  • поток символов - F[_]

трансдьюсер:

A => S => (S, F[B])
  • по входному символу
  • и состоянию
  • порождает состояние и (небольшой) выходной поток

"Алгебры": лирическое отступление

  • погружение обычных величин в "алгебру" F[T]
  • комбинирование средствами алгебры (map, flatMap, merge, etc.)
  • возврат из алгебры в обычное пространство

"Алгебры": IO

  • IO[T] - вычисление, возвращающее величину T (или исключение)
  • комбинаторы позволяют сконструировать будущее вычисление
  • io.UnsafeRunSync(iot) - фактически запустить вычисление
  • преимущество - сохранение чистоты функций до последнего момента

"Алгебры": Stream

  • Stream[T] - абстрактный поток значений типа T, который может быть превращён в вычисление io.IO[[]T]
  • удобные комбинаторы, позволяющие конструировать сложные алгоритмы обработки
  • возврат в обычное пространство - запись в файл, возврат потока по http, побочные эффекты, генерация событий
Библиотека goio
  • по мотивам cats, fs2
  • кроме потоков - IO, Pair, slice.*, sets.*, ...
  • безопасный шаг вычислений - IO[A]
  • поток - Stream[A]
  • ограничения Go
Библиотека goio. IO[A]
type IO[A any] Continuation[A]
type Continuation[A any] func() ResultOrContinuation[A]
type ResultOrContinuation[A any] struct {
	Value        A
	Error        error
	Continuation *Continuation[A]
}
  • шаг - либо успешный, либо ошибка
  • panic - в ошибку
  • Continuation[A] - для "трамполайнинга"
Библиотека goio. Stream[A]
func StreamFold[A any, B any](
  stm Stream[A],
  onFinish func() io.IO[B],
  onValue func(a A, tail Stream[A]) io.IO[B],
  onEmpty func(tail Stream[A]) io.IO[B],
  onError func(err error) io.IO[B],
) io.IO[B] 
  • каждый шаг - безопасный IO
  • деконструктор потока
  • поток может завершиться
  • может породить один символ
  • может сработать вхолостую
  • может выдать ошибку
Библиотека goio. Stream[A]. реализация
type Stream[A any] io.IO[StepResult[A]]
type StepResult[A any] struct {
  Value        A
  HasValue     bool // models "Option[A]"
  Continuation Stream[A]
  IsFinished   bool // true when stream has completed
}
  • шаг IO порождает либо ошибку, либо результат
  • результат
    • конец потока
    • один символ
    • пусто
Пример 1. Числа Фибоначчи (fs2)

        def fib(prev: BigInt, b: BigInt): Stream[Pure, BigInt] =
            Stream.emit(b) ++ fib(b, prev + b)
        val fib01 = fib(0, 1)
              

        assert(fib01.take(5).toList == List(1, 1, 2, 3, 5))
              

        assert(fib01
            .map(_.pow(2))
            .filter(_ % 2 == 1)
            .take(5).toList == List(1, 1, 9, 25, 169)
        )
              

        assert(fib01.drop(55).head.toList ==
                    List(BigInt(225_851_433_717L))
              
Пример 1. Числа Фибоначчи (goio)

func Fib(prev int, b int) Stream[int] {
  return AndThenLazy(Emit(b), func() Stream[int] {
    return Fib(b, prev+b)
  })
}

var fibs01 = stream.Fib(0, 1)

func TestFibs(t *testing.T) {
	powered := stream.Map(fibs01, pow2)
	filtered := stream.FilterNot(powered, isEven)
	filtered5 := stream.Take(filtered, 5)
	expected := []int{1, 1, 9, 25, 169}
	assert.ElementsMatch(t, expected, UnsafeStreamToSlice(t, filtered5))
}

hIO := stream.Head(stream.Drop(fib01, 55))
assert.Equal(t, int64(225851433717), UnsafeIO(t, hIO))
Пример 1. Числа Фибоначчи (3)
  • ленивые вычисления
  • бесконечный поток, вычисляемый на лету
  • комбинаторы для формирования программы
  • отложенное исполнение
Возможности библиотеки goio
  • строго типизированный код
  • гарантия корректной обработки ошибок
  • обработка бесконечных потоков с постоянной памятью (GC)
  • сборка программы комбинаторами из пользовательских функций
  • параллельная обработка
Недочёты (возможности улучшения)
  • закрытие ресурсов по окончании потока
  • производительность. На один элемент потока: 31 выделение памяти, 1Киб, 1.4 мкс
Заключение
  • поточные библиотеки до generic'ов - не очень удобны
  • благодаря generic'ам в язык Go стал более высокоуровневым
  • goio - используется в реальном приложении
  • класс задач обработки данных с минимальными задержками (данные обрабатываются по мере поступления)
Ссылки

Спасибо за внимание

Вопросы?

Арсений Александрович Жижелев, Праймтолк / zhizhelev@primetalk.ru