Я пытаюсь создать потоковый конвейер с Dataflow, который читает сообщения из темы PubSub и записывает их в таблицу BigQuery. Я не хочу использовать какой-либо шаблон. На данный момент я просто хочу создать конвейер в скрипте Python3, выполняемом из экземпляра виртуальной машины Google, чтобы выполнить этот простой процесс без какого-либо преобразования данных, поступающих из Pubsub (структура сообщений соответствует ожидаемой в таблице).
Сообщения, опубликованные в теме PubSub, выглядят следующим образом:
data = '{"A":1, "B":"Hey", "C":"You"}'
message = data.encode('utf-8')
Это функция, которую я использую для конвейера:
pipeline_options = PipelineOptions(pipeline_args = None, streaming = True,
save_main_session = True)
parse_table_schema_from_json(json.dumps(json.load(open("schema.json"))))
# table_schema = ["fields" :[{"type":"INTEGER", "name":"A",
# "mode":"REQUIRED"},{"type":"STRING", "name":"B", "mode":"NULLABLE"},
# {"type":"STRING", "name":"C", "mode":"NULLABLE"}]]
with beam.Pipeline(options=pipeline_options) as p:
# Read the pubsub topic and write the menssage into a bigquery table
message = ( p | beam.io.ReadFromPubSub(topic="projects/real-
demand/topics/Test_API", subscription=None)
| beam.io.WriteToBigQuery(table = '$Table', dataset =
'$Dataset', project = '$Project', schema =
table_schema)
)
У меня естьследующая ошибка:
AttributeError: 'str' object has no attribute 'items'