Я пытаюсь определить статус завершения на разных уровнях детализации.Например, регион является «полным», если все города в этом регионе завершены.
Я поддерживаю состояние на самом низком уровне (город) в памяти, используя следующий подход в Spark:
Шаг 1. Загрузите исходное состояние из таблицы Cassandra во фрейм данных Spark.
+----------+--------+--------+------------+
| country | region | town | isComplete |
+----------+--------+--------+------------+
| Country1 | State1 | Town1 | FALSE |
| Country1 | State1 | Town2 | FALSE |
| Country1 | State1 | Town3 | FALSE |
| Country1 | State1 | Town4 | FALSE |
| Country1 | State1 | Town5 | FALSE |
| Country1 | State1 | Town6 | FALSE |
| Country1 | State1 | Town7 | FALSE |
| Country1 | State1 | Town8 | FALSE |
| Country1 | State1 | Town9 | FALSE |
| Country1 | State1 | Town10 | FALSE |
| Country1 | State1 | Town11 | FALSE |
+----------+--------+--------+------------+
Шаг 2. Запустите обработку потока и, используя фреймы данных, созданные в каждой микропакете, попытайтесь обновить состояние вфрейм данных из шага 1 с использованием левого внешнего соединения.
Пакет 1:
+----------+--------+-------+------------+
| country | region | town | isComplete |
+----------+--------+-------+------------+
| Country1 | State1 | Town1 | TRUE |
| Country1 | State1 | Town2 | TRUE |
| Country1 | State1 | Town3 | TRUE |
| Country1 | State1 | Town4 | TRUE |
+----------+--------+-------+------------+
После применения Пакета 1:
+----------+--------+--------+------------+
| country | state | town | isComplete |
+----------+--------+--------+------------+
| Country1 | State1 | Town1 | TRUE |
| Country1 | State1 | Town2 | TRUE |
| Country1 | State1 | Town3 | TRUE |
| Country1 | State1 | Town4 | TRUE |
| Country1 | State1 | Town5 | FALSE |
| Country1 | State1 | Town6 | FALSE |
| Country1 | State1 | Town7 | FALSE |
| Country1 | State1 | Town8 | FALSE |
| Country1 | State1 | Town9 | FALSE |
| Country1 | State1 | Town10 | FALSE |
| Country1 | State1 | Town11 | FALSE |
+----------+--------+--------+------------+
Моя идея состоит в том, что, сохраняя изменяемый фрейм данных, я смогу обновить егов каждом пакете и поддерживать общее состояние (например, глобальную переменную) в течение всего срока выполнения задания потоковой передачи.
Базовый набор данных составляет около 1,2 миллиона записей (приблизительно 100 МБ) и, как ожидается, будет масштабироваться до 10GB.
У меня проблемы с нехваткой памяти.Каждая партия занимает значительно больше времени, чем предыдущая.Также увеличивается количество этапов для одной и той же работы между партиями.В конечном итоге приложение завершается ошибкой с превышением предельных значений GC.
var statusDf = loadStatusFromCassandra(sparkSession)
ipStream.foreachRDD { statusMsgRDD =>
if (!statusMsgRDD.isEmpty) {
// 1. Create data-frame from the current micro-batch RDD
val messageDf = getMessageDf(sparkSession, statusMsgRDD)
// 2. To update, Left outer join statusDf with messageDf
statusDf = updateStatusDf(sparkSession, statusDf, messageDf)
// 3. Use updated statusDf to generate aggregations at higher levels
// and publish to a Kafka topic
// if a higher level (eg. region) is completed.
}
}