У меня возникла проблема при попытке запустить конвейер GCP Cloud-Dataflow.
Конвейер работает при локальном запуске с использованием "DirectRunner", но завершается ошибкой при попытке запуска в потоке данных с помощью "DataflowRunner".
Сбой немедленно при вызове run()
в конвейере с сообщением об ошибке, приведенным выше (в отличие от первого развертывания в GCP, а затем сбой при фактическом запуске конвейера).
Исключение выдается при вызове beam.io.WriteToBigQuery
:
(bq_rows
| 'map_to_row' >> beam.Map(to_pred_row)
| 'write_to_table' >> beam.io.WriteToBigQuery(
'my_dataset_name.my_table_name',
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED))
Если я заменю последний узел в конвейере чем-то, что просто записывает в файл:
(bq_rows
| 'map_to_row' >> beam.Map(to_pred_row)
| 'debug_write_to_csv_2' >> beam.io.WriteToText(additional_opts.out_path, ".txt"))
Тогда все работает как положено, и я получаю текстовый файл со всеми записями, которые я ожидал.
Если я запускаю все как есть с функцией WriteToBigQuery()
, но меняю обратно на DirectRunner
(и ничего не меняю), тогда все работает, и новые строки записываются в таблицу BQ.
Насколько я могу судить, нет ничего примечательного в том, что записи поступают в
WriteToBigQuery
узел. Я вывел их в текстовый файл, работающий как локально, так и в облаке, чтобы выделить причину этой ошибки, но оба вывода выглядят одинаково (и соответствуют схеме таблицы назначения). В любом случае не похоже, что при запуске потока все становится слишком далеко от неожиданного значения или параметра - как уже упоминалось, эта ошибка возникает всякий раз, когда я вызываю run()
в конвейере
Куда я иду не так?
UPDATE:
Вот минимальный пример того же поведения. Создав таблицу с именем temp_e.words
с одним (STRING, REQUIRED) столбцом с именем word
, я могу воспроизвести поведение с помощью этого кода:
import apache_beam as beam
from google.cloud import storage as gcs
import shutil
from google.cloud import bigquery as bq
import datetime
import os
import json
import apache_beam as beam
from apache_beam.options.pipeline_options import (GoogleCloudOptions,
StandardOptions)
def to_row(word):
return {
'word': word
}
def run_pipeline(local_mode):
PROJECT = 'searchlab-data-insights'
REGION = 'us-central1'
GCS_BUCKET_PATH = 'gs://temp-staging-e'
timestamp = datetime.datetime.now().strftime('%y%m%d-%H%M%S')
options = beam.pipeline.PipelineOptions([
'--project', PROJECT
])
if local_mode:
RUNNER = 'DirectRunner'
else:
RUNNER = 'DataflowRunner'
google_cloud_options = options.view_as(GoogleCloudOptions)
google_cloud_options.project = PROJECT
google_cloud_options.job_name = 'test-{}'.format(timestamp)
google_cloud_options.staging_location = os.path.join(GCS_BUCKET_PATH, 'staging')
google_cloud_options.temp_location = os.path.join(GCS_BUCKET_PATH, 'tmp')
options.view_as(StandardOptions).runner = RUNNER
p = beam.Pipeline(RUNNER, options=options)
bq_rows = p | beam.Create(['words', 'to', 'store'])
(bq_rows
| 'map_to_row' >> beam.Map(to_row)
| 'write_to_table' >> beam.io.WriteToBigQuery(
'temp_e.words',
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
)
job = p.run()
if local_mode:
job.wait_until_finish()
print "Done!"
Теперь выполнение run_pipeline(local_mode=True)
дает правильный результат, и строки добавляются, тогда как выполнение run_pipeline(local_mode=False)
немедленно вызывает ошибку.
Полная сгенерированная ошибка здесь: https://pastebin.com/xx8wwtXV