Как отлаживать поток данных Python beam.io.WriteToBigQuery - PullRequest
0 голосов
/ 06 февраля 2019

В настоящее время у меня есть задание потока данных Python, конечным приемником которого является запись PCollection в BigQuery.Это происходит со следующей ошибкой:

Workflow failed. Causes: S01:XXXX+XXX+Write/WriteToBigQuery/NativeWrite failed., BigQuery import job "dataflow_job_XXXXXX" failed., BigQuery job "dataflow_job_XXXXXX" in project "XXXXXX" finished with error(s): errorResult: Error while reading data, error message: JSON table encountered too many errors, giving up. Rows: 19; errors: 1

Чтобы получить более подробный отчет об ошибке, я запускаю:

bq --format=prettyjson show -j dataflow_job_XXXXXX

, который отображает что-то вроде (есть куча ошибок, этоэто только один из них):

{

    "location": "gs://XXXXX/XXXXXX/tmp/XXXXX/10002237702794672370/dax-tmp-2019-02-05_20_14_50-18341731408970037725-S01-0-5144bb700f6a9f0b/-shard--try-00d3c2c24d5b0371-endshard.json",

    "message": "Error while reading data, error message: JSON table encountered too many errors, giving up. Rows: 11; errors: 1. Please look into the errors[] collection for more details.",                                          

    "reason": "invalid"

  },

Затем я иду искать конкретный осколок, чтобы увидеть, какая строка PCollection содержит ошибку и что мне нужно сделать, чтобы отфильтровать эти строки или исправить мою ошибку:

gsutil ls gs://XXXXX/XXXXXX/tmp/XXXXX/10002237702794672370/dax-tmp-2019-02-05_20_14_50-18341731408970037725-S01-0-5144bb700f6a9f0b/-shard--try-00d3c2c24d5b0371-endshard.json

Но эта команда возвращает:

CommandException: One or more URLs matched no objects.

Каковы оптимальные методы отладки заданий (которые занимают несколько часов между прочим)?Сейчас я думаю написать PCollection в GCS во временном каталоге в формате JSON и попробовать самому проглотить его.

1 Ответ

0 голосов
/ 07 февраля 2019

Для вашего типа ошибки я делаю следующее:

  1. Используйте инструмент проверки Json для вывода записей с ошибками.
  2. Локальный запуск облачного потока данных.
  3. Добавьте шаг конвейера, чтобы проверить каждую запись Json и удалить ошибочные записи из конвейера.Используйте файл мертвой буквы, используя побочный вывод, ИЛИ записывайте неверные записи для отладки.

Эта статья может дать вам несколько советов по обработке недопустимых входных данных.

Обработка недопустимых входных данных вПотоковый

...