Облачный поток данных Google - WriteToBigQuery: объект 'NoneType' не имеет атрибута '__getitem__' - PullRequest
0 голосов
/ 01 мая 2019

У меня возникла проблема при попытке запустить конвейер GCP Cloud-Dataflow.

Конвейер работает при локальном запуске с использованием "DirectRunner", но завершается ошибкой при попытке запуска в потоке данных с помощью "DataflowRunner".

Сбой немедленно при вызове run() в конвейере с сообщением об ошибке, приведенным выше (в отличие от первого развертывания в GCP, а затем сбой при фактическом запуске конвейера).

Исключение выдается при вызове beam.io.WriteToBigQuery:

(bq_rows 
| 'map_to_row' >> beam.Map(to_pred_row)
| 'write_to_table' >> beam.io.WriteToBigQuery(
    'my_dataset_name.my_table_name', 
    write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
    create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED))

Если я заменю последний узел в конвейере чем-то, что просто записывает в файл:

(bq_rows 
| 'map_to_row' >> beam.Map(to_pred_row)
| 'debug_write_to_csv_2' >> beam.io.WriteToText(additional_opts.out_path, ".txt"))

Тогда все работает как положено, и я получаю текстовый файл со всеми записями, которые я ожидал.

Если я запускаю все как есть с функцией WriteToBigQuery(), но меняю обратно на DirectRunner (и ничего не меняю), тогда все работает, и новые строки записываются в таблицу BQ.

Насколько я могу судить, нет ничего примечательного в том, что записи поступают в WriteToBigQuery узел. Я вывел их в текстовый файл, работающий как локально, так и в облаке, чтобы выделить причину этой ошибки, но оба вывода выглядят одинаково (и соответствуют схеме таблицы назначения). В любом случае не похоже, что при запуске потока все становится слишком далеко от неожиданного значения или параметра - как уже упоминалось, эта ошибка возникает всякий раз, когда я вызываю run() в конвейере

Куда я иду не так?


UPDATE:

Вот минимальный пример того же поведения. Создав таблицу с именем temp_e.words с одним (STRING, REQUIRED) столбцом с именем word, я могу воспроизвести поведение с помощью этого кода:

import apache_beam as beam
from google.cloud import storage as gcs
import shutil
from google.cloud import bigquery as bq
import datetime
import os
import json
import apache_beam as beam
from apache_beam.options.pipeline_options import (GoogleCloudOptions, 
                                                  StandardOptions)


def to_row(word):
  return {
      'word': word
  }

def run_pipeline(local_mode):

  PROJECT = 'searchlab-data-insights'
  REGION = 'us-central1'
  GCS_BUCKET_PATH = 'gs://temp-staging-e'

  timestamp = datetime.datetime.now().strftime('%y%m%d-%H%M%S')

  options = beam.pipeline.PipelineOptions([
      '--project', PROJECT
  ])

  if local_mode:
    RUNNER = 'DirectRunner'
  else:
    RUNNER = 'DataflowRunner'

  google_cloud_options = options.view_as(GoogleCloudOptions)
  google_cloud_options.project = PROJECT
  google_cloud_options.job_name = 'test-{}'.format(timestamp)
  google_cloud_options.staging_location = os.path.join(GCS_BUCKET_PATH, 'staging')
  google_cloud_options.temp_location = os.path.join(GCS_BUCKET_PATH, 'tmp')
  options.view_as(StandardOptions).runner = RUNNER

  p = beam.Pipeline(RUNNER, options=options)
  bq_rows = p | beam.Create(['words', 'to', 'store']) 

  (bq_rows 
    | 'map_to_row' >> beam.Map(to_row)
    | 'write_to_table' >> beam.io.WriteToBigQuery(
        'temp_e.words', 
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
        create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
  )

  job = p.run()

  if local_mode:
    job.wait_until_finish()
    print "Done!"

Теперь выполнение run_pipeline(local_mode=True) дает правильный результат, и строки добавляются, тогда как выполнение run_pipeline(local_mode=False) немедленно вызывает ошибку.

Полная сгенерированная ошибка здесь: https://pastebin.com/xx8wwtXV

1 Ответ

1 голос
/ 02 мая 2019

Эта проблема возникает только в том случае, если для вызова beam.io.WriteToBigQuery не предоставлена ​​схема.Кажется, что DirectRunner может работать с использованием существующей схемы таблиц, но DataflowRunner не может.

при отсутствии лучшего ответа, мы можем обойти это, явно предоставив схему.

Так, например, в приведенном выше минимальном примере мы могли бы использовать это:

(bq_rows 
| 'map_to__row' >> beam.Map(to_row)
| 'write_to_table' >> beam.io.WriteToBigQuery(
    'temp_e.words',
    <b>schema={"fields":[{"type":"STRING","name":"word","mode":"REQUIRED"}]}</b>
    write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
    create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
)

...