Преобразование с сохранением состояния на одном кадре данных в потоковой передаче Spark - PullRequest
4 голосов
/ 06 марта 2019

Я пытаюсь определить статус завершения на разных уровнях детализации.Например, регион является «полным», если все города в этом регионе завершены.

Я поддерживаю состояние на самом низком уровне (город) в памяти, используя следующий подход в Spark:

Шаг 1. Загрузите исходное состояние из таблицы Cassandra во фрейм данных Spark.

+----------+--------+--------+------------+
| country  | region |  town  | isComplete |
+----------+--------+--------+------------+
| Country1 | State1 | Town1  | FALSE      |
| Country1 | State1 | Town2  | FALSE      |
| Country1 | State1 | Town3  | FALSE      |
| Country1 | State1 | Town4  | FALSE      |
| Country1 | State1 | Town5  | FALSE      |
| Country1 | State1 | Town6  | FALSE      |
| Country1 | State1 | Town7  | FALSE      |
| Country1 | State1 | Town8  | FALSE      |
| Country1 | State1 | Town9  | FALSE      |
| Country1 | State1 | Town10 | FALSE      |
| Country1 | State1 | Town11 | FALSE      |
+----------+--------+--------+------------+

Шаг 2. Запустите обработку потока и, используя фреймы данных, созданные в каждой микропакете, попытайтесь обновить состояние вфрейм данных из шага 1 с использованием левого внешнего соединения.

Пакет 1:

+----------+--------+-------+------------+
| country  | region | town  | isComplete |
+----------+--------+-------+------------+
| Country1 | State1 | Town1 | TRUE       |
| Country1 | State1 | Town2 | TRUE       |
| Country1 | State1 | Town3 | TRUE       |
| Country1 | State1 | Town4 | TRUE       |
+----------+--------+-------+------------+

После применения Пакета 1:

+----------+--------+--------+------------+
| country  | state  |  town  | isComplete |
+----------+--------+--------+------------+
| Country1 | State1 | Town1  | TRUE       |
| Country1 | State1 | Town2  | TRUE       |
| Country1 | State1 | Town3  | TRUE       |
| Country1 | State1 | Town4  | TRUE       |
| Country1 | State1 | Town5  | FALSE      |
| Country1 | State1 | Town6  | FALSE      |
| Country1 | State1 | Town7  | FALSE      |
| Country1 | State1 | Town8  | FALSE      |
| Country1 | State1 | Town9  | FALSE      |
| Country1 | State1 | Town10 | FALSE      |
| Country1 | State1 | Town11 | FALSE      |
+----------+--------+--------+------------+

Моя идея состоит в том, что, сохраняя изменяемый фрейм данных, я смогу обновить егов каждом пакете и поддерживать общее состояние (например, глобальную переменную) в течение всего срока выполнения задания потоковой передачи.

Базовый набор данных составляет около 1,2 миллиона записей (приблизительно 100 МБ) и, как ожидается, будет масштабироваться до 10GB.

У меня проблемы с нехваткой памяти.Каждая партия занимает значительно больше времени, чем предыдущая.Также увеличивается количество этапов для одной и той же работы между партиями.В конечном итоге приложение завершается ошибкой с превышением предельных значений GC.

var statusDf = loadStatusFromCassandra(sparkSession)
ipStream.foreachRDD { statusMsgRDD =>
  if (!statusMsgRDD.isEmpty) {
    // 1. Create data-frame from the current micro-batch RDD
    val messageDf = getMessageDf(sparkSession, statusMsgRDD)

    // 2. To update, Left outer join statusDf with messageDf
    statusDf = updateStatusDf(sparkSession, statusDf, messageDf)

    // 3. Use updated statusDf to generate aggregations at higher levels
    // and publish to a Kafka topic
    // if a higher level (eg. region) is completed.
  }
}
...