Beam Job создает таблицу BigQuery, но не вставляет - PullRequest
0 голосов
/ 29 февраля 2020

Я пишу лучевую работу, которая представляет собой простой ETL 1: 1 из двоичного файла protobuf, хранящегося в GCS, в BigQuery. Схема таблицы довольно велика и генерируется автоматически из репрезентативного протобуфа.

Я сталкиваюсь с поведением, когда таблица BigQuery создается успешно, но записи не вставляются. Я подтвердил, что записи генерируются на более ранней стадии, и когда я использую обычный приемник файлов, я могу подтвердить, что записи записаны.

Кто-нибудь знает, почему это происходит?

Журналы :

WARNING:root:Inferring Schema...
WARNING:root:Unable to find default credentials to use: The Application Default Credentials are not available. They are available if running in Google Compute Engine. Otherwise, the environment variable GOOGLE_APPLICATION_CREDENTIALS must be defined pointing to a file defining the credentials. See https://developers.google.com/accounts/docs/application-default-credentials for more information.
Connecting anonymously.
WARNING:root:Defining Beam Pipeline...
<PATH REDACTED>/venv/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery.py:1145: BeamDeprecationWarning: options is deprecated since First stable release. References to <pipeline>.options will not be supported
  experiments = p.options.view_as(DebugOptions).experiments or []
WARNING:root:Running Beam Pipeline...
WARNING:root:extracted {'counters': [MetricResult(key=MetricKey(step=extract_games, metric=MetricName(namespace=__main__.ExtractGameProtobuf, name=extracted_games), labels={}), committed=8, attempted=8)], 'distributions': [], 'gauges': []} games

Трубопровод Источник:

def main(args):
    DEFAULT_REPLAY_IDS_PATH = "./replay_ids.txt"

    DEFAULT_BQ_TABLE_OUT = "<PROJECT REDACTED>:<DATASET REDACTED>.games"

    # configure logging
    logging.basicConfig(level=logging.WARNING)

    # set up replay source
    replay_source = ETLReplayRemoteSource.default()

    # TODO: load the example replay and parse schema
    logging.warning("Inferring Schema...")
    sample_replay = replay_source.load_replay(DEFAULT_REPLAY_IDS[0])
    game_schema = ProtobufToBigQuerySchemaGenerator(
        sample_replay.analysis.DESCRIPTOR).schema()
    # print("GAME SCHEMA:\n{}".format(game_schema))  # DEBUG

    # submit beam job that reads replays into bigquery

    def count_ones(word_ones):
        (word, ones) = word_ones
        return (word, sum(ones))

    with beam.Pipeline(options=PipelineOptions()) as p:
        logging.warning("Defining Beam Pipeline...")
        # replay_ids = p | "create_replay_ids" >> beam.Create(DEFAULT_REPLAY_IDS)
        (p | "read_replay_ids" >> beam.io.ReadFromText(DEFAULT_REPLAY_IDS_PATH)
           | "extract_games" >> beam.ParDo(ExtractGameProtobuf())
           | "write_out_bq" >> WriteToBigQuery(
            DEFAULT_BQ_TABLE_OUT,
            schema=game_schema,
            write_disposition=BigQueryDisposition.WRITE_APPEND,
            create_disposition=BigQueryDisposition.CREATE_IF_NEEDED)
         )

        logging.warning("Running Beam Pipeline...")
        result = p.run()
        result.wait_until_finish()
        n_extracted = result.metrics().query(
            MetricsFilter().with_name('extracted_games'))
        logging.warning("extracted {} games".format(n_extracted))
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...