Pyspark - структурированный источник потокового файла - PullRequest
0 голосов
/ 12 апреля 2020

Попытались прочитать входные данные из файлов JSON, используя поток чтения Pyspark, и записать результат в новый файл JSON, используя метод foreachBatch. Применяя некоторую агрегацию к данным, чтобы использовать используемый режим.

Проблема заключается в том, что потоковая передача не считывает новые файлы, сброшенные позднее во входной каталог, она обрабатывает только те файлы, которые находились во входном каталоге. когда он начался.

Может видеть только предупреждающее сообщение, когда есть новый входной файл и потоковая передача была выполнена, но в выходном каталоге не создан новый файл JSON.

Сообщение: WARN FileStreamSource: Перечислено 2 файла (ов) за 36895.685 мс

Я выполняю код из записной книжки Jupyter, мог видеть только указанное выше предупреждающее сообщение. Проверено lastProgess обработчика потока, он обновлен с отметкой времени, но numInputRows равен 0.

Насколько я понимаю, структурированная потоковая передача будет продолжать смотреть на входной каталог и обрабатывать новые файлы на основе ограничения, определенного для потоковой передачи. , В моем коде определены ограничения maxFilePerTrigger, а триггер основан на времени обработки.

Пожалуйста, помогите мне решить эту проблему.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...