У меня есть простой конвейер потока данных, который успешно работает на моей локальной машине:
import argparse
import logging
import ast
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions
from apache_beam.io.gcp.internal.clients import bigquery
def parse_args_set_logging(argv=None):
"""
parse command line arguments
"""
parser = argparse.ArgumentParser()
parser.add_argument('--verbose',
action='store_true',
help='set the logging level to debug')
parser.add_argument('--topic',
default=<my topic>,
help='GCP pubsub topic to subscribe to')
known_args, pipeline_args = parser.parse_known_args(argv)
# set logging level
logging.basicConfig()
if known_args.verbose:
logging.getLogger().setLevel(logging.INFO)
return known_args, pipeline_args
class formatForBigQueryDoFn(beam.DoFn):
def record_handler(self, data):
"""
Build a dictionary ensuring format matches BigQuery table schema
"""
return {
"uid": data['uid'],
"interaction_type": data['interaction_type'],
"interaction_asset_id": data['interaction_asset_id'],
"interaction_value": data['interaction_value'],
"timestamp": data['timestamp'],
}
def process(self, element):
# extract data from the PubsubMessage python object and convert to python dict
data = ast.literal_eval(element.data)
logging.info("ELEMENT OBJECT: {}".format(data))
# format the firestore timestamp for bigquery
data['timestamp'] = data['timestamp']['_seconds']
# construct the data for bigquery
result = self.record_handler(data)
return [result]
if __name__ == '__main__':
known_args, pipeline_args = parse_args_set_logging()
# create a pipeline object
pipeline_options = GoogleCloudOptions(pipeline_args)
p = beam.Pipeline(options=pipeline_options)
# create a PCollection from the GCP pubsub topic
inputCollection = p | beam.io.ReadFromPubSub(
topic=known_args.topic,
# id_label='id', # unique identifier in each record to be processed
with_attributes=True, # output PubsubMessage objects
)
# chain together multiple transform methods, to create a new PCollection
OutputCollection = inputCollection | beam.ParDo(formatForBigQueryDoFn())
# write the resulting PCollection to BigQuery
table_spec = <my table spec>
table_schema = 'uid:STRING, interaction_type:STRING, interaction_asset_id:STRING, interaction_value:STRING, timestamp:TIMESTAMP'
OutputCollection | beam.io.WriteToBigQuery(
table_spec,
schema=table_schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
# run the pipeline
result = p.run().wait_until_finish()
Я пытаюсь запустить этот код, используя поток данных GCP. Для этого мне нужно установить зависимость Python, AST
. Я попытался создать requirements.txt
и использовать аргумент --requirements_file
, но безуспешно. Я сейчас пытаюсь с setup.py
. После документов мой setup.py
выглядит так:
import setuptools
setuptools.setup(
name='pubsub_to_BQ',
version='1.0',
install_requires=[
'AST'
],
packages=setuptools.find_packages(),
)
Я работаю на GCP со следующей командой:
python main.py --runner DataflowRunner \
--setup_file ./setup.py \
--project <myproject> \
--temp_location <my bucket> \
--verbose \
--streaming \
--job_name bigqueryinteractions
Однако, когда конвейер обрабатывает данные, я получаю следующую ошибку:
File "main.py", line 47, in process
NameError: global name 'ast' is not defined [while running 'generatedPtransform-54']
Как я могу решить это?