Как запустить непрерывный пакетный процесс в Spark - PullRequest
0 голосов
/ 16 мая 2018

Я пробовал и API-интерфейс spark batch, и API-интерфейс структурированной потоковой передачи, но я все еще новичок в этом и удивляюсь, как можно достичь таблицы измерений в реальном времени.

У меня есть сценарий использования для обновления двухмерной таблицы типа почти в реальном времени (задержка ~ 10 минут). Требуется, чтобы вывод последней работы был вводом новой работы. Мой подход состоит в том, чтобы запустить задание непрерывного пакетного запуска, которое возобновляется с самого начала после завершения всех этапов преобразования, а не завершается со статусом успеха. Это похоже на потоковое задание, но запускает пакеты внутри и блокирует (т. Е. Новые данные не читаются и не обрабатываются, последние прочитанные данные объединяются в итоговую таблицу измерений). Является ли это возможным?

1 Ответ

0 голосов
/ 17 мая 2018

Выполнение всего за один запуск Spark является обязательным требованием?Я имею в виду, вы хотите создать кластер и последовательно выполнять одну и ту же обработку данных, когда один завершается, запускается другой?

Вместо этого вы можете использовать в качестве Apache Airflow инструмент оркестровки, который поддерживает последовательное выполнение задач. Вы можете взглянуть на https://airflow.apache.org/code.html и выполнить поиск по запросу «depen_ounds_p_past».Следующее задание будет выполнено только после завершения его предшественника.

Другим решением может быть потоковое задание Spark, запущенное с падающими окнами в течение 10 минут и принимающее выходные данные предыдущего задания в качестве входных данных.Но убедитесь, что у вас достаточно памяти, чтобы хранить эти предметы в течение 10 минут.

=====================================

edit:

Прочитав ваш комментарий @Stella, я получил новую версию своего первоначального предложения.Но так как я не знаю, каковы ваши источники и приемники данных, я сделал несколько элементарных версий, основанных на РСУБД (H2) и локальных текстовых файлах.

Таким образом, в базу данных я добавил столбец, хранящийверсия строк, которую мы хотим получить в каждой итерации.Я также добавил столбец для обработки разбиения (он должен быть числовым, о котором я написал пост здесь: http://www.waitingforcode.com/apache-spark-sql/partitioning-rdbms-data-spark-sql-jdbc/read). Теперь он работает как в следующей схеме:

  • Структурированная потоковая передача Apache SparkDataFrame считывает первый файл (начальная версия), запускающий задание
  • , а затем присоединяется к нему, когда строки в таблице помечены данной версией (Spark SQL, пакетная обработка)
  • в конце, к которому добавляетсяновые столбцы в базе данных с новой версией для извлечения - но она также может обновляться на месте

enter image description here

Я написал простой код иллюстрации и нажална мой Github: https://github.com/bartosz25/spark-scala-playground/blob/master/src/test/scala/com/waitingforcode/stackoverflow/FeedbackOutputIntoInputSparkTest.scala

Код содержит «упрощенную» версию, записывающую каждую строку в отдельный запрос. Я не уверен, что он будет масштабироваться правильно, и вам, вероятно, следует предпочесть пакетные операции SQL.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...