Как исправить 'Отсутствует выполнение информации о проекте.Пожалуйста, используйте --project ' - PullRequest
0 голосов
/ 26 сентября 2019

Я пытаюсь получить все данные из моей таблицы в bigquery, но я получаю сообщение об ошибке при запуске кода.

Я пробовал пример кода из Apache Beam для чтения из BigQuery, а такжекод из среднего поста.

Луч: https://beam.apache.org/documentation/io/built-in/google-bigquery/

Средний пост: https://medium.com/google-cloud/how-to-run-python-code-on-your-bigquery-table-1bbd78c69351

Вот мой код:

import json

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
from apache_beam.io.gcp.internal.clients.bigquery import bigquery_v2_messages

# google project
project_id = "project-123456"

# bigquery
dataset_id = "my_dataset"
table_id = "my_table"

look_up_table_schema = {
    'fields': [
        {'name': 'id', 'type': 'INTEGER', 'mode': 'NULLABLE'},
        {'name': 'pathname', 'type': 'STRING', 'mode': 'NULLABLE'},
    ]}

look_up_table_spec = bigquery_v2_messages.TableReference(
    projectId=project_id,
    datasetId=dataset_id,
    tableId=table_id)


def printer(element):
    print()
    print("printer:")
    pprint(element)
    print()


if __name__ == '__main__':
    import os

    os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "./cred_file/cred_file.json"

    import warnings

    warnings.filterwarnings("ignore")

    options = PipelineOptions()  # create and set your PipelineOptions

    options.view_as(StandardOptions).runner = 'DirectRunner'
    options.view_as(StandardOptions).streaming = True

    with beam.Pipeline(options=options) as p:  # Create the Pipeline with the specified options.

        query = "select id, pathname from '%s.%s.%s'" % (project_id, dataset_id, table_id)
        print(query)

        get_look_up_table = (p
                             | 'Read from bigquery' >> beam.io.Read(beam.io.BigQuerySource(table=look_up_table_spec))
                             | 'Id to pathname' >> beam.Map((lambda row: (row['id'], row['pathname'])))
                             | 'printer' >> beam.ParDo(printer)
                             )

        # get_look_up_table = (p
        #                      | 'Read from bigquery' >> beam.io.Read(beam.io.BigQuerySource(query=query, use_standard_sql=True))
        #                      | 'Id to pathname' >> beam.Map((lambda row: (row['id'], row['pathname'])))
        #                      | 'printer' >> beam.ParDo(printer)
        #                      )

Project_id, dataset_id и table_id имеют поддельные значения, но таблица, из которой я пытаюсь получить данные, существует.

Я получаю эту ошибку при запуске конвейера:

ERROR:root:Exception at bundle <apache_beam.runners.direct.bundle_factory._Bundle object at 0x7ff0e1dce500>, due to an exception.
 Traceback (most recent call last):
  File "/usr/local/lib/python3.7/dist-packages/apache_beam/runners/direct/executor.py", line 343, in call
    finish_state)
  File "/usr/local/lib/python3.7/dist-packages/apache_beam/runners/direct/executor.py", line 383, in attempt_call
    result = evaluator.finish_bundle()
  File "/usr/local/lib/python3.7/dist-packages/apache_beam/runners/direct/transform_evaluator.py", line 318, in finish_bundle
    with self._source.reader() as reader:
  File "/usr/local/lib/python3.7/dist-packages/apache_beam/io/gcp/bigquery.py", line 495, in reader
    kms_key=self.kms_key)
  File "/usr/local/lib/python3.7/dist-packages/apache_beam/io/gcp/bigquery_tools.py", line 855, in __init__
    'Missing executing project information. Please use the --project '
RuntimeError: Missing executing project information. Please use the --project command line option to specify it.

Что я ожидаюэто получить содержание этой таблицы: https://imgur.com/viN0PYS

Как я могу решить эту проблему?Заранее спасибо!

1 Ответ

2 голосов
/ 26 сентября 2019

Решил проблему!

Вместо этого:

options = PipelineOptions()  # create and set your PipelineOptions

options.view_as(StandardOptions).runner = 'DirectRunner'
options.view_as(StandardOptions).streaming = True```

Я сделал это:

options = {
    'project': project_id,
    'runner:': 'DirectRunner',
    'streaming': True
}

options = PipelineOptions(flags=[], **options)  # create and set your PipelineOptions
...