Библиотеки по умолчанию для DataFlow 3.x - PullRequest
0 голосов
/ 14 января 2020

Версии

  • Python 3,5
  • DataFlow / Apache Луч [GCP] 2.17.0

Мой код Python содержит следующую строку:

from googleapiclient import discovery

Когда код выполняется, я получаю:

ImportError: No module named 'googleapiclient' Traceback (most recent call last): File "/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/sdk_worker_main.py", line 134, in main _load_main_session(semi_persistent_directory) File "/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/sdk_worker_main.py", line 237, in _load_main_session pickler.load_session(session_file) File "/usr/local/lib/python3.5/site-packages/apache_beam/internal/pickler.py", line 307, in load_session return dill.load_session(file_path) File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 410, in load_session module = unpickler.load() File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 825, in _import_module return getattr(__import__(module, None, None, [obj]), obj) ImportError: No module named 'googleapiclient'
at _import_module (/usr/local/lib/python3.5/site-packages/dill/_dill.py:825)
at load_session (/usr/local/lib/python3.5/site-packages/dill/_dill.py:410)
at load_session (/usr/local/lib/python3.5/site-packages/apache_beam/internal/pickler.py:307)
at _load_main_session (/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/sdk_worker_main.py:237)
at main (/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/sdk_worker_main.py:134)

Разве эта библиотека предположительно не установлена ​​по умолчанию?

Начало трубопровода:

python3 dataflow/twitter_dataflow.py --input_subscription "projects/project-257304/subscriptions/project-twitter"  --requirements_file requirements.txt &^C
newsml-listener-1:/usr/local/src/ml_pipeline/dataflow# vim requirements.txt 
newsml-listener-1:/usr/local/src/ml_pipeline/dataflow# python3 twitter_dataflow.py --input_subscription "projects/project-257304/subscriptions/project-twitter"
/usr/local/lib/python3.5/dist-packages/apache_beam/io/gcp/bigquery.py:1217: BeamDeprecationWarning: options is deprecated since First stable release. References to <pipeline>.options will not be supported
  experiments = p.options.view_as(DebugOptions).experiments or []
INFO:root:Setting socket default timeout to 60 seconds.
INFO:root:socket default timeout is 60.0 seconds.
INFO:root:Starting GCS upload to gs://project-dev/project-twitter/staging/beamapp-root-0114000833-155259.1578960513.155431/pipeline.pb...
INFO:oauth2client.transport:Attempting refresh to obtain initial access_token
INFO:oauth2client.client:Refreshing access_token
INFO:oauth2client.transport:Attempting refresh to obtain initial access_token
INFO:oauth2client.client:Refreshing access_token
INFO:root:Completed GCS upload to gs://project-dev/project-twitter/staging/beamapp-root-0114000833-155259.1578960513.155431/pipeline.pb in 0 seconds.
INFO:root:Starting GCS upload to gs://project-dev/project-twitter/staging/beamapp-root-0114000833-155259.1578960513.155431/pickled_main_session...
INFO:root:Completed GCS upload to gs://project-dev/project-twitter/staging/beamapp-root-0114000833-155259.1578960513.155431/pickled_main_session in 0 seconds.
INFO:root:Downloading source distribution of the SDK from PyPi
INFO:root:Executing command: ['/usr/bin/python3', '-m', 'pip', 'download', '--dest', '/tmp/tmpn56bqihm', 'apache-beam==2.17.0', '--no-deps', '--no-binary', ':all:']
INFO:root:Staging SDK sources from PyPI to gs://project-dev/project-twitter/staging/beamapp-root-0114000833-155259.1578960513.155431/dataflow_python_sdk.tar
INFO:root:Starting GCS upload to gs://project-dev/project-twitter/staging/beamapp-root-0114000833-155259.1578960513.155431/dataflow_python_sdk.tar...
INFO:root:Completed GCS upload to gs://project-dev/project-twitter/staging/beamapp-root-0114000833-155259.1578960513.155431/dataflow_python_sdk.tar in 0 seconds.
INFO:root:Downloading binary distribution of the SDK from PyPi
INFO:root:Executing command: ['/usr/bin/python3', '-m', 'pip', 'download', '--dest', '/tmp/tmpn56bqihm', 'apache-beam==2.17.0', '--no-deps', '--only-binary', ':all:', '--python-version', '35', '--implementation', 'cp', '--abi', 'cp35m', '--platform', 'manylinux1_x86_64']
INFO:root:Staging binary distribution of the SDK from PyPI to gs://project-dev/project-twitter/staging/beamapp-root-0114000833-155259.1578960513.155431/apache_beam-2.17.0-cp35-cp35m-manylinux1_x86_64.whl
INFO:root:Starting GCS upload to gs://project-dev/project-twitter/staging/beamapp-root-0114000833-155259.1578960513.155431/apache_beam-2.17.0-cp35-cp35m-manylinux1_x86_64.whl...
INFO:root:Completed GCS upload to gs://project-dev/project-twitter/staging/beamapp-root-0114000833-155259.1578960513.155431/apache_beam-2.17.0-cp35-cp35m-manylinux1_x86_64.whl in 1 seconds.
INFO:root:Create job: <Job
 createTime: '2020-01-14T00:08:49.061397Z'
 currentStateTime: '1970-01-01T00:00:00Z'
 id: '2020-01-13_16_08_47-7502570004684079955'
 location: 'us-central1'
 name: 'beamapp-root-0114000833-155259'
 projectId: 'project-257304'
 stageStates: []
 startTime: '2020-01-14T00:08:49.061397Z'
 steps: []
 tempFiles: []
 type: TypeValueValuesEnum(JOB_TYPE_STREAMING, 2)>
INFO:root:Created job with id: [2020-01-13_16_08_47-7502570004684079955]
INFO:root:To access the Dataflow monitoring console, please navigate to https://console.cloud.google.com/dataflow/jobsDetail/locations/us-central1/jobs/2020-01-13_16_08_47-7502570004684079955?project=project-257304
INFO:root:Job 2020-01-13_16_08_47-7502570004684079955 is in state JOB_STATE_PENDING
INFO:root:2020-01-14T00:08:52.568Z: JOB_MESSAGE_DETAILED: Checking permissions granted to controller Service Account.
INFO:root:2020-01-14T00:08:54.190Z: JOB_MESSAGE_BASIC: Worker configuration: n1-standard-4 in us-central1-a.
INFO:root:2020-01-14T00:08:55.324Z: JOB_MESSAGE_DETAILED: Expanding SplittableParDo operations into optimizable parts.
INFO:root:2020-01-14T00:08:55.332Z: JOB_MESSAGE_DETAILED: Expanding CollectionToSingleton operations into optimizable parts.
INFO:root:2020-01-14T00:08:55.355Z: JOB_MESSAGE_DETAILED: Expanding CoGroupByKey operations into optimizable parts.
INFO:root:2020-01-14T00:08:55.429Z: JOB_MESSAGE_DETAILED: Expanding SplittableProcessKeyed operations into optimizable parts.
INFO:root:2020-01-14T00:08:55.439Z: JOB_MESSAGE_DETAILED: Expanding GroupByKey operations into streaming Read/Write steps
INFO:root:2020-01-14T00:08:55.477Z: JOB_MESSAGE_DEBUG: Annotating graph with Autotuner information.
INFO:root:2020-01-14T00:08:55.519Z: JOB_MESSAGE_DETAILED: Fusing adjacent ParDo, Read, Write, and Flatten operations
INFO:root:2020-01-14T00:08:55.528Z: JOB_MESSAGE_DETAILED: Fusing consumer pair with key into predict sentiment
INFO:root:2020-01-14T00:08:55.534Z: JOB_MESSAGE_DETAILED: Fusing consumer store twitter posts/_StreamToBigQuery/AppendDestination into predict sentiment
INFO:root:2020-01-14T00:08:55.542Z: JOB_MESSAGE_DETAILED: Fusing consumer assign window key into read in tweets/Read
INFO:root:2020-01-14T00:08:55.558Z: JOB_MESSAGE_DETAILED: Fusing consumer batch into n batches/ParDo(_WindowAwareBatchingDoFn) into assign window key
INFO:root:2020-01-14T00:08:55.568Z: JOB_MESSAGE_DETAILED: Fusing consumer predict sentiment into batch into n batches/ParDo(_WindowAwareBatchingDoFn)
INFO:root:2020-01-14T00:08:55.576Z: JOB_MESSAGE_DETAILED: Fusing consumer store twitter posts/_StreamToBigQuery/AddInsertIds into store twitter posts/_StreamToBigQuery/AppendDestination
INFO:root:2020-01-14T00:08:55.580Z: JOB_MESSAGE_DETAILED: Fusing consumer store twitter posts/_StreamToBigQuery/CommitInsertIds/AddRandomKeys into store twitter posts/_StreamToBigQuery/AddInsertIds
INFO:root:2020-01-14T00:08:55.586Z: JOB_MESSAGE_DETAILED: Fusing consumer store twitter posts/_StreamToBigQuery/CommitInsertIds/ReshufflePerKey/Map(reify_timestamps) into store twitter posts/_StreamToBigQuery/CommitInsertIds/AddRandomKeys
INFO:root:2020-01-14T00:08:55.591Z: JOB_MESSAGE_DETAILED: Fusing consumer store twitter posts/_StreamToBigQuery/CommitInsertIds/ReshufflePerKey/GroupByKey/WriteStream into store twitter posts/_StreamToBigQuery/CommitInsertIds/ReshufflePerKey/Map(reify_timestamps)
INFO:root:2020-01-14T00:08:55.596Z: JOB_MESSAGE_DETAILED: Fusing consumer store twitter posts/_StreamToBigQuery/CommitInsertIds/ReshufflePerKey/GroupByKey/MergeBuckets into store twitter posts/_StreamToBigQuery/CommitInsertIds/ReshufflePerKey/GroupByKey/ReadStream
INFO:root:2020-01-14T00:08:55.603Z: JOB_MESSAGE_DETAILED: Fusing consumer store twitter posts/_StreamToBigQuery/CommitInsertIds/ReshufflePerKey/FlatMap(restore_timestamps) into store twitter posts/_StreamToBigQuery/CommitInsertIds/ReshufflePerKey/GroupByKey/MergeBuckets
INFO:root:2020-01-14T00:08:55.612Z: JOB_MESSAGE_DETAILED: Fusing consumer store twitter posts/_StreamToBigQuery/CommitInsertIds/RemoveRandomKeys into store twitter posts/_StreamToBigQuery/CommitInsertIds/ReshufflePerKey/FlatMap(restore_timestamps)
INFO:root:2020-01-14T00:08:55.618Z: JOB_MESSAGE_DETAILED: Fusing consumer store twitter posts/_StreamToBigQuery/StreamInsertRows/ParDo(BigQueryWriteFn) into store twitter posts/_StreamToBigQuery/CommitInsertIds/RemoveRandomKeys
INFO:root:2020-01-14T00:08:55.625Z: JOB_MESSAGE_DETAILED: Fusing consumer group by key/WriteStream into pair with key
INFO:root:2020-01-14T00:08:55.635Z: JOB_MESSAGE_DETAILED: Fusing consumer group by key/MergeBuckets into group by key/ReadStream
INFO:root:2020-01-14T00:08:55.641Z: JOB_MESSAGE_DETAILED: Fusing consumer aggregate and format into group by key/MergeBuckets
INFO:root:2020-01-14T00:08:55.646Z: JOB_MESSAGE_DETAILED: Fusing consumer store aggregated sentiment/_StreamToBigQuery/AppendDestination into aggregate and format
INFO:root:2020-01-14T00:08:55.654Z: JOB_MESSAGE_DETAILED: Fusing consumer store aggregated sentiment/_StreamToBigQuery/AddInsertIds into store aggregated sentiment/_StreamToBigQuery/AppendDestination
INFO:root:2020-01-14T00:08:55.658Z: JOB_MESSAGE_DETAILED: Fusing consumer store aggregated sentiment/_StreamToBigQuery/CommitInsertIds/AddRandomKeys into store aggregated sentiment/_StreamToBigQuery/AddInsertIds
INFO:root:2020-01-14T00:08:55.662Z: JOB_MESSAGE_DETAILED: Fusing consumer store aggregated sentiment/_StreamToBigQuery/CommitInsertIds/ReshufflePerKey/Map(reify_timestamps) into store aggregated sentiment/_StreamToBigQuery/CommitInsertIds/AddRandomKeys
INFO:root:2020-01-14T00:08:55.665Z: JOB_MESSAGE_DETAILED: Fusing consumer store aggregated sentiment/_StreamToBigQuery/CommitInsertIds/ReshufflePerKey/GroupByKey/WriteStream into store aggregated sentiment/_StreamToBigQuery/CommitInsertIds/ReshufflePerKey/Map(reify_timestamps)
INFO:root:2020-01-14T00:08:55.668Z: JOB_MESSAGE_DETAILED: Fusing consumer store aggregated sentiment/_StreamToBigQuery/CommitInsertIds/ReshufflePerKey/GroupByKey/MergeBuckets into store aggregated sentiment/_StreamToBigQuery/CommitInsertIds/ReshufflePerKey/GroupByKey/ReadStream
INFO:root:2020-01-14T00:08:55.672Z: JOB_MESSAGE_DETAILED: Fusing consumer store aggregated sentiment/_StreamToBigQuery/CommitInsertIds/ReshufflePerKey/FlatMap(restore_timestamps) into store aggregated sentiment/_StreamToBigQuery/CommitInsertIds/ReshufflePerKey/GroupByKey/MergeBuckets
INFO:root:2020-01-14T00:08:55.679Z: JOB_MESSAGE_DETAILED: Fusing consumer store aggregated sentiment/_StreamToBigQuery/CommitInsertIds/RemoveRandomKeys into store aggregated sentiment/_StreamToBigQuery/CommitInsertIds/ReshufflePerKey/FlatMap(restore_timestamps)
INFO:root:2020-01-14T00:08:55.688Z: JOB_MESSAGE_DETAILED: Fusing consumer store aggregated sentiment/_StreamToBigQuery/StreamInsertRows/ParDo(BigQueryWriteFn) into store aggregated sentiment/_StreamToBigQuery/CommitInsertIds/RemoveRandomKeys
INFO:root:2020-01-14T00:08:55.717Z: JOB_MESSAGE_DEBUG: Adding StepResource setup and teardown to workflow graph.
INFO:root:2020-01-14T00:08:55.770Z: JOB_MESSAGE_DEBUG: Adding workflow start and stop steps.
INFO:root:2020-01-14T00:08:55.821Z: JOB_MESSAGE_DEBUG: Assigning stage ids.
INFO:root:2020-01-14T00:08:56.038Z: JOB_MESSAGE_DEBUG: Executing wait step start2
INFO:root:2020-01-14T00:08:56.079Z: JOB_MESSAGE_DEBUG: Starting worker pool setup.
INFO:root:2020-01-14T00:08:56.095Z: JOB_MESSAGE_BASIC: Starting 1 workers...
INFO:root:2020-01-14T00:08:58.353Z: JOB_MESSAGE_BASIC: Executing operation store aggregated sentiment/_StreamToBigQuery/CommitInsertIds/ReshufflePerKey/GroupByKey/ReadStream+store aggregated sentiment/_StreamToBigQuery/CommitInsertIds/ReshufflePerKey/GroupByKey/MergeBuckets+store aggregated sentiment/_StreamToBigQuery/CommitInsertIds/ReshufflePerKey/FlatMap(restore_timestamps)+store aggregated sentiment/_StreamToBigQuery/CommitInsertIds/RemoveRandomKeys+store aggregated sentiment/_StreamToBigQuery/StreamInsertRows/ParDo(BigQueryWriteFn)
INFO:root:2020-01-14T00:08:58.356Z: JOB_MESSAGE_BASIC: Executing operation store twitter posts/_StreamToBigQuery/CommitInsertIds/ReshufflePerKey/GroupByKey/ReadStream+store twitter posts/_StreamToBigQuery/CommitInsertIds/ReshufflePerKey/GroupByKey/MergeBuckets+store twitter posts/_StreamToBigQuery/CommitInsertIds/ReshufflePerKey/FlatMap(restore_timestamps)+store twitter posts/_StreamToBigQuery/CommitInsertIds/RemoveRandomKeys+store twitter posts/_StreamToBigQuery/StreamInsertRows/ParDo(BigQueryWriteFn)
INFO:root:2020-01-14T00:08:58.356Z: JOB_MESSAGE_BASIC: Executing operation group by key/ReadStream+group by key/MergeBuckets+aggregate and format+store aggregated sentiment/_StreamToBigQuery/AppendDestination+store aggregated sentiment/_StreamToBigQuery/AddInsertIds+store aggregated sentiment/_StreamToBigQuery/CommitInsertIds/AddRandomKeys+store aggregated sentiment/_StreamToBigQuery/CommitInsertIds/ReshufflePerKey/Map(reify_timestamps)+store aggregated sentiment/_StreamToBigQuery/CommitInsertIds/ReshufflePerKey/GroupByKey/WriteStream
INFO:root:2020-01-14T00:08:58.357Z: JOB_MESSAGE_BASIC: Executing operation read in tweets/Read+assign window key+batch into n batches/ParDo(_WindowAwareBatchingDoFn)+predict sentiment+pair with key+store twitter posts/_StreamToBigQuery/AppendDestination+store twitter posts/_StreamToBigQuery/AddInsertIds+store twitter posts/_StreamToBigQuery/CommitInsertIds/AddRandomKeys+store twitter posts/_StreamToBigQuery/CommitInsertIds/ReshufflePerKey/Map(reify_timestamps)+store twitter posts/_StreamToBigQuery/CommitInsertIds/ReshufflePerKey/GroupByKey/WriteStream+group by key/WriteStream
INFO:root:Job 2020-01-13_16_08_47-7502570004684079955 is in state JOB_STATE_RUNNING
INFO:root:2020-01-14T00:09:23.143Z: JOB_MESSAGE_DETAILED: Checking permissions granted to controller Service Account.
INFO:root:2020-01-14T00:09:23.148Z: JOB_MESSAGE_DEBUG: Executing input step topology_init_attach_disk_input_step
INFO:root:2020-01-14T00:09:23.873Z: JOB_MESSAGE_BASIC: Worker configuration: n1-standard-4 in us-central1-a.
INFO:root:2020-01-14T00:09:42.039Z: JOB_MESSAGE_DETAILED: Workers have started successfully.
INFO:root:2020-01-14T00:15:23.137Z: JOB_MESSAGE_DETAILED: Checking permissions granted to controller Service Account.
INFO:root:2020-01-14T00:21:23.137Z: JOB_MESSAGE_DETAILED: Checking permissions granted to controller Service Account.
INFO:root:2020-01-14T00:27:23.137Z: JOB_MESSAGE_DETAILED: Checking permissions granted to controller Service Account.

1 Ответ

1 голос
/ 15 января 2020

Как видно из предоставленной документации , SDK для Python версии 2.17.0 и Python 3.5.7, googleapiclient пакет еще не установлен.

Если если вы хотите установить этот пакет на своих рабочих узлах, вы можете следовать Apache Beam документации об управлении python конвейерными зависимостями.

Установить пакет на вашем компьютере :

pip install google-api-python-client

Затем выясните, какие пакеты установлены на вашем компьютере. Выполните следующую команду:

pip freeze > requirements.txt

Эта команда создает файл requirements.txt, в котором перечислены все пакеты, установленные на вашем компьютере.

Отредактируйте файл requirements.txt и оставьте только пакеты которые были установлены из PyPI и используются в источнике рабочего процесса. Удалите все пакеты, которые не имеют отношения к вашему коду.

Запустите ваш конвейер с помощью следующей опции командной строки, которая будет использовать файл для установки дополнительных зависимостей на удаленных работников:

--requirements_file requirements.txt

Например:

python prediction/run.py --runner DataflowRunner --project $PROJECT --staging_location $BUCKET/staging --temp_location $BUCKET/temp --job_name $PROJECT-prediction-cs --setup_file prediction/setup.py --model $BUCKET/model --source cs --input $BUCKET/input/images.txt --output $BUCKET/output/predict --requirements_file requirements.txt

Надеюсь, это поможет вам.

...