Как обработать работу, подобную карте Spark, на Dataframe, где каждый вывод строки зависит от текущей строки и предыдущего вывода? - PullRequest
0 голосов
/ 05 ноября 2019

Сможет ли кто-нибудь помочь с приведенной ниже проблемой?

Формальная проблема Как обработать задание Spark, для которого требуется, с учетом фрейма данных типа A, вывести фрейм данныхтипа B, сохраняющий мощность. Функция карты - это не просто a<sub>i</sub> => b<sub>i</sub>, а (a<sub>i</sub>, b<sub>i-1</sub>) => b<sub>i</sub>. Начальное значение b<sub>0</sub> известно и не является проблематичным.

Реальный сценарий У меня есть информационный кадр торговых ордеров на бирже, в котором перечислены люди, которые вставляют новые ордера, удаляют их, обновляют ихпо цене и объему и тд. В такой ситуации можно определить книгу заказов (подробнее здесь ) как упорядоченный набор на каждой отметке времени, действующих на тот момент заказов. Такой набор явно состоит из новых изменений, связанных со временем t (некоторые новые заказы приходят, некоторые старые уходят), а также из всех предыдущих изменений. В приведенной выше терминологии A - это событие заказа, B - книга, которая получается из события, учитывая его предыдущее состояние.

То, что у меня в данный момент есть У меня было этоПроблема около года назад, и я решил ее с неприятной, но работающей уловкой. Это упрощенная версия кода, который у меня есть (на самом деле я вывожу некоторую статистику, учитывая книгу и идентификатор строки):

    val orderEvents: RDD[OrderEvent] = dfSorted.rdd.repartition(1)
    val orderBookAcc = new LobAccumulator(new ConsistentLob(dateValue(sparkSession)))
    sparkSession.sparkContext.register(orderBookAcc, s"lastLob_$strippedID")
    orderEvents.zipWithIndex().map { case (event, _) =>
      orderBookAcc.add(event)
      orderBookAcc.value
    }

Это работает, потому что я использую аккумулятор и заставляю логику выполняться последовательнои без необходимости объединять частичные результаты путем подгонки всех данных к одному единственному разделу

Недавние ограничения Приведенное выше решение, помимо того, что imho довольно не элегантно, не масштабируется. На сегодняшний день это не было проблемой, поскольку данные для этого задания составляли около 10 тыс. Строк, но недавно мы получили требования поддержки новых потоков данных, которые могут принести до 40 млн. Строк в одном кадре данных. Модель с одним разделом кажется неприемлемой в этом случае, особенно потому, что кластер не может дать нам столько памяти на узел (3 г на процесс на момент написания)

Технические характеристики НадеюсьЭто не очень актуально, но для ясности я работаю в кластере Cloudera Spark, приложение отправлено в режиме драйвера (драйвер Windows). Версия Spark 2.4.4, и я работаю в Scala 2.11.2

...