Как запустить несколько пакетов при потоковой передаче из источника HDFS? - PullRequest
0 голосов
/ 02 января 2019

У меня есть такой набор данных val df = spark.readStream.schema(s).parquet ("/path/to/file").where("Foo > 0").groupBy("bar").agg(expr("sum(Foo)")). Набор данных содержит более 1 миллиона записей, а файл Parquet содержит 1 раздел.

Я запускаю поток с df.writeStream.outputMode("update").format("console").start.

Затем Spark обрабатывает весь файл одновременно. Но я ожидаю, что Spark каким-то образом «разбивает» файл и обрабатывает каждое разбиение за раз при обновлении результата, так же, как пример обновления результата подсчета слов при вводе нового слова.

Я пытался добавить trigger(Trigger.ProcessingTime("x seconds")), но это не сработало.

1 Ответ

0 голосов
/ 03 января 2019

Затем Spark обрабатывает весь файл одновременно.Но я ожидаю, что Spark каким-то образом «разбивает» файл и обрабатывает каждое разбиение за раз при обновлении результата, точно так же, как пример обновления количества слов при вводе нового слова.

Вот какSpark Structured Streaming работает с файлами.Он обрабатывает их сразу и никогда не считает их снова.Он «разделяет» файл на части (ну, это должно быть в руках хранилища, например, HDFS, а не Spark), но это происходит под прикрытием.

Обратите внимание, что однаждыфайл обработан, файл больше никогда не будет обработан.

Я попытался добавить trigger(Trigger.ProcessingTime("x seconds")), но это не сработало.

Ну, так и было, но не какВы хотели.

DataStreamWriter.trigger устанавливает триггер для потокового запроса.Значением по умолчанию является ProcessingTime (0), и запрос будет выполнен максимально быстро.

Обратитесь к скаладоку DataStreamWriter .

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...