Python устанавливает зависимость потока данных, но не может импортировать - PullRequest
0 голосов
/ 21 марта 2019

У меня есть простой конвейер потока данных, который успешно работает на моей локальной машине:

import argparse
import logging
import ast
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions
from apache_beam.io.gcp.internal.clients import bigquery


def parse_args_set_logging(argv=None):
    """
    parse command line arguments
    """
    parser = argparse.ArgumentParser()
    parser.add_argument('--verbose',
                        action='store_true',
                        help='set the logging level to debug')
    parser.add_argument('--topic',
                        default=<my topic>,
                        help='GCP pubsub topic to subscribe to')

    known_args, pipeline_args = parser.parse_known_args(argv)

    # set logging level
    logging.basicConfig()
    if known_args.verbose:
        logging.getLogger().setLevel(logging.INFO)

    return known_args, pipeline_args


class formatForBigQueryDoFn(beam.DoFn):
    def record_handler(self, data):
        """
        Build a dictionary ensuring format matches BigQuery table schema
        """
        return {
            "uid": data['uid'],
            "interaction_type": data['interaction_type'],
            "interaction_asset_id": data['interaction_asset_id'],
            "interaction_value": data['interaction_value'],
            "timestamp": data['timestamp'],
        }

    def process(self, element):

        # extract data from the PubsubMessage python object and convert to python dict
        data = ast.literal_eval(element.data)
        logging.info("ELEMENT OBJECT: {}".format(data))

        # format the firestore timestamp for bigquery
        data['timestamp'] = data['timestamp']['_seconds']

        # construct the data for bigquery
        result = self.record_handler(data)
        return [result]


if __name__ == '__main__':
    known_args, pipeline_args = parse_args_set_logging()

    # create a pipeline object
    pipeline_options = GoogleCloudOptions(pipeline_args)
    p = beam.Pipeline(options=pipeline_options)

    # create a PCollection from the GCP pubsub topic
    inputCollection = p | beam.io.ReadFromPubSub(
        topic=known_args.topic,
        # id_label='id',  # unique identifier in each record to be processed
        with_attributes=True,  # output PubsubMessage objects
    )

    # chain together multiple transform methods, to create a new PCollection
    OutputCollection = inputCollection | beam.ParDo(formatForBigQueryDoFn())

    # write the resulting PCollection to BigQuery
    table_spec = <my table spec>
    table_schema = 'uid:STRING, interaction_type:STRING, interaction_asset_id:STRING, interaction_value:STRING, timestamp:TIMESTAMP'

    OutputCollection | beam.io.WriteToBigQuery(
        table_spec,
        schema=table_schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
        create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)

    # run the pipeline
    result = p.run().wait_until_finish()

Я пытаюсь запустить этот код, используя поток данных GCP. Для этого мне нужно установить зависимость Python, AST. Я попытался создать requirements.txt и использовать аргумент --requirements_file, но безуспешно. Я сейчас пытаюсь с setup.py. После документов мой setup.py выглядит так:

import setuptools

setuptools.setup(
    name='pubsub_to_BQ',
    version='1.0',
    install_requires=[
        'AST'
    ],
    packages=setuptools.find_packages(),
)

Я работаю на GCP со следующей командой:

python main.py --runner DataflowRunner \
               --setup_file ./setup.py \
               --project <myproject> \
               --temp_location <my bucket> \
               --verbose \
               --streaming \
               --job_name bigqueryinteractions

Однако, когда конвейер обрабатывает данные, я получаю следующую ошибку:

File "main.py", line 47, in process NameError: global name 'ast' is not defined [while running 'generatedPtransform-54']

Как я могу решить это?

Ответы [ 2 ]

1 голос
/ 23 марта 2019

AFAIK, если вы укажете setup.py через командную строку оболочки, тогда вы должны использовать абсолютный путь, также с Dataflow попробуйте логический флаг save_main_session, так как без него ваш развернутый шаблон не разрешит зависимости, указанные в setup.py.

Параметры, которые не являются динамическими для вашего конвейера, могут быть определены во время построения конвейера.

Например, таким образом вы можете hardcode некоторые из неизменяемых аргументов, которые вам нужнывсегда передавать, поэтому вам нужно только указать аргументы, которые изменяются от выполнения к выполнению:

known_args, pipe_args = parser.parse_known_args()
standard_pipe_arg = ['--save_main_session', 'setup_file=./setup.py', '--streaming']
pipe_opts = PipelineOptions(pipe_args + standard_pipe_arg)
0 голосов
/ 21 марта 2019

Я нашел обходной путь, используя json libary вместо ast. Я все еще хотел бы знать, что я делаю здесь неправильно.

...