Как исправить «AttributeError: у объекта 'str' нет атрибута 'items'" в конвейере Dataflow, считывающем из PubSub и записывающем в BigQuery - PullRequest
0 голосов
/ 17 октября 2019

Я пытаюсь создать потоковый конвейер с Dataflow, который читает сообщения из темы PubSub и записывает их в таблицу BigQuery. Я не хочу использовать какой-либо шаблон. На данный момент я просто хочу создать конвейер в скрипте Python3, выполняемом из экземпляра виртуальной машины Google, чтобы выполнить этот простой процесс без какого-либо преобразования данных, поступающих из Pubsub (структура сообщений соответствует ожидаемой в таблице).

Сообщения, опубликованные в теме PubSub, выглядят следующим образом:

data = '{"A":1, "B":"Hey", "C":"You"}'
message = data.encode('utf-8')

Это функция, которую я использую для конвейера:

pipeline_options = PipelineOptions(pipeline_args = None, streaming = True, 
save_main_session = True)
parse_table_schema_from_json(json.dumps(json.load(open("schema.json"))))

# table_schema = ["fields" :[{"type":"INTEGER", "name":"A", 
# "mode":"REQUIRED"},{"type":"STRING", "name":"B", "mode":"NULLABLE"}, 
# {"type":"STRING", "name":"C", "mode":"NULLABLE"}]]


with beam.Pipeline(options=pipeline_options) as p:

    # Read the pubsub topic and write the menssage into a bigquery table

    message = ( p | beam.io.ReadFromPubSub(topic="projects/real- 
                    demand/topics/Test_API", subscription=None)
                  | beam.io.WriteToBigQuery(table = '$Table', dataset = 
                    '$Dataset', project = '$Project', schema = 
                    table_schema)
               )

У меня естьследующая ошибка:

 AttributeError: 'str' object has no attribute 'items'

1 Ответ

1 голос
/ 20 октября 2019

Вы переходите в string, а не JSON. Вам нужно будет проанализировать входную строку как json, как показано ниже

def parse_pubsub(line):
    import json
    record = json.loads(line)
    return (record['A']), (record['B']), (record['C'])

, и в вашем конвейере вам нужно будет сделать что-то вроде этого

  lines = ( p | beam.io.ReadStringsFromPubSub(known_args.input_topic)
                | beam.Map(parse_pubsub)
                | beam.Map(lambda (a_bq, b_bq, c_bq): {'A': a_bq, 'B': b_bq, 'C': c_bq})
                | beam.io.WriteToBigQuery(
                    known_args.output_table,
                    schema=' A:STRING, B:STRING, C:STRING',
                    create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                    write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
            )
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...