Google Dataflow: глобальное имя не определено - apache beam - PullRequest
0 голосов
/ 16 ноября 2018

В локальной системе у меня есть это:

from shapely.geometry import Point
class GeoDataIngestion:
    def parse_method(self, string_input):
       Place = Point(float(values[2]), float(values[3]))

Я запускаю это, с Python 2.7 и все идет хорошо

После этого я пытаюсь проверить его с помощью бегуна потока данных, но во время работыЯ получил эту ошибку:

NameError: global name 'Point' is not defined


geo_data = (raw_data
                    | 'Geo data transform' >> beam.Map(lambda s: geo_ingestion.parse_method(s))

Я прочитал другой пост , и я думаю, что это должно работать,но я не уверен, есть ли что-то особенное с Google Dataflow в этом

Я также пытался:

import shapely.geometry
Place = shapely.geometry.Point(float(values[2]), float(values[3]))

С тем же результатом

NameError: global name 'shapely' is not defined

Любая идея?

В Google Cloud, если я попробую в своей виртуальной среде, я могу сделать это без проблем:

(env) ...@cloudshell:~ ()$ python
Python 2.7.13 (default, Sep 26 2018, 18:42:22)
[GCC 6.3.0 20170516] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> from shapely.geometry import Point
Var = Point(-5.020751953125, 39.92237576385941)


Ошибка при использовании needs.txt

Collecting Shapely==1.6.4.post1 (from -r req.txt (line 2))
  Using cached
  Saved c:\<...>\shapely-1.6.4.post1.tar.gz
    Complete output from command python egg_info:
    Traceback (most recent call last):
      File "<string>", line 1, in <module>
      File "c:\<...>\temp\pip-download-kpg5ca\Shapely\", line 80, in <module>
        from shapely._buildcfg import geos_version_string, geos_version, \
      File "shapely\", line 200, in <module>
        lgeos = CDLL("geos_c.dll")
      File "C:\Python27\Lib\ctypes\", line 366, in __init__
        self._handle = _dlopen(self._name, mode)
    WindowsError: [Error 126] No se puede encontrar el m¢dulo especificado

Ошибка при использовании как это изменение:

    ['apt-get', 'update'],
    ['apt-get', '--assume-yes', 'install', 'libgeos-dev'],
    ['pip', 'install', 'Shapely'],
    ['echo', 'Custom command worked!']

В результате, как будто пакет не будет установлен, потому что я получаю ошибку с самого начала:

NameError: global name 'Point' is not defined файл:

from __future__ import absolute_import
from __future__ import print_function
import subprocess
from import build as _build
import setuptools

class build(_build):  # pylint: disable=invalid-name
  sub_commands = _build.sub_commands + [('CustomCommands', None)]
    ['apt-get', 'update'],
    ['apt-get', '--assume-yes', 'install', 'libgeos-dev'],
    ['pip', 'install', 'Shapely']]

class CustomCommands(setuptools.Command):  
  def initialize_options(self):

  def finalize_options(self):

  def RunCustomCommand(self, command_list):
    print('Running command: %s' % command_list)
    p = subprocess.Popen(
        stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
    # Can use communicate(input='y\n'.encode()) if the command run requires
    # some confirmation.
    stdout_data, _ = p.communicate()
    print('Command output: %s' % stdout_data)
    if p.returncode != 0:
      raise RuntimeError(
          'Command %s failed: exit code: %s' % (command_list, p.returncode))

  def run(self):
    for command in CUSTOM_COMMANDS:


    description='Dataflow set workflow package.',
        'build': build,
        'CustomCommands': CustomCommands,

Параметры конвейера:

 pipeline_options = PipelineOptions()
    pipeline_options.view_as(StandardOptions).streaming = True
    pipeline_options.view_as(SetupOptions).save_main_session = True
    pipeline_options.view_as(SetupOptions).setup_file = 'C:\<...>\'

    with beam.Pipeline(options=pipeline_options) as p:


python -m dataflow --project XXX --temp_location gs://YYY --runner DataflowRunner --region europe-west1 --setup_file C:\<...>\

Начало журнала: (до потока данныхдождитесь данных)

INFO:root:Defaulting to the temp_location as staging_location: gs://iotbucketdetector/test/prueba
C:\Users\<...>~1\Desktop\PROYEC~2\env\lib\site-packages\apache_beam\runners\dataflow\ DeprecationWarning: options is deprecated since First stable release.. References to <pipeline>.options will
 not be supported
INFO:root:Starting GCS upload to gs://<...>-1120074505-586000.1542699905.588000/pipeline.pb...
INFO:oauth2client.transport:Attempting refresh to obtain initial access_token
INFO:oauth2client.client:Refreshing access_token
INFO:root:Completed GCS upload to gs://<...>-1120074505-586000.1542699905.588000/pipeline.pb
INFO:root:Executing command: ['C:\\Users\\<...>~1\\Desktop\\PROYEC~2\\env\\Scripts\\python.exe', '', 'sdist', '--dist-dir', 'c:\\users\\<...>~1\\appdata\\local\\temp\\tmpakq8bs']
running sdist
running egg_info
writing requirements to dataflow.egg-info\requires.txt
writing dataflow.egg-info\PKG-INFO
writing top-level names to dataflow.egg-info\top_level.txt
writing dependency_links to dataflow.egg-info\dependency_links.txt
reading manifest file 'dataflow.egg-info\SOURCES.txt'
writing manifest file 'dataflow.egg-info\SOURCES.txt'
warning: sdist: standard file not found: should have one of README, README.rst, README.txt,

running check
warning: check: missing required meta-data: url

warning: check: missing meta-data: either (author and author_email) or (maintainer and maintainer_email) must be supplied

creating dataflow-0.0.1
creating dataflow-0.0.1\dataflow.egg-info
copying files to dataflow-0.0.1...
copying -> dataflow-0.0.1
copying dataflow.egg-info\PKG-INFO -> dataflow-0.0.1\dataflow.egg-info
copying dataflow.egg-info\SOURCES.txt -> dataflow-0.0.1\dataflow.egg-info
copying dataflow.egg-info\dependency_links.txt -> dataflow-0.0.1\dataflow.egg-info
copying dataflow.egg-info\requires.txt -> dataflow-0.0.1\dataflow.egg-info
copying dataflow.egg-info\top_level.txt -> dataflow-0.0.1\dataflow.egg-info
Writing dataflow-0.0.1\setup.cfg
Creating tar archive
removing 'dataflow-0.0.1' (and everything under it)
INFO:root:Starting GCS upload to gs://<...>-1120074505-586000.1542699905.588000/workflow.tar.gz...
INFO:root:Completed GCS upload to gs://<...>-1120074505-586000.1542699905.588000/workflow.tar.gz
INFO:root:Starting GCS upload to gs://<...>-1120074505-586000.1542699905.588000/pickled_main_session...
INFO:root:Completed GCS upload to gs://<...>-1120074505-586000.1542699905.588000/pickled_main_session
INFO:root:Downloading source distribtution of the SDK from PyPi
INFO:root:Executing command: ['C:\\Users\\<...>~1\\Desktop\\PROYEC~2\\env\\Scripts\\python.exe', '-m', 'pip', 'download', '--dest', 'c:\\users\\<...>~1\\appdata\\local\\temp\\tmpakq8bs', 'apache-beam==2.5.0', '--no-d
eps', '--no-binary', ':all:']
Collecting apache-beam==2.5.0
  Using cached
  Saved c:\users\<...>~1\appdata\local\temp\tmpakq8bs\
Successfully downloaded apache-beam
INFO:root:Staging SDK sources from PyPI to gs://<...>-1120074505-586000.1542699905.588000/dataflow_python_sdk.tar
INFO:root:Starting GCS upload to gs://<...>-1120074505-586000.1542699905.588000/dataflow_python_sdk.tar...
INFO:root:Completed GCS upload to gs://<...>-1120074505-586000.1542699905.588000/dataflow_python_sdk.tar
INFO:root:Downloading binary distribtution of the SDK from PyPi
INFO:root:Executing command: ['C:\\Users\\<...>~1\\Desktop\\PROYEC~2\\env\\Scripts\\python.exe', '-m', 'pip', 'download', '--dest', 'c:\\users\\<...>~1\\appdata\\local\\temp\\tmpakq8bs', 'apache-beam==2.5.0', '--no-d
eps', '--only-binary', ':all:', '--python-version', '27', '--implementation', 'cp', '--abi', 'cp27mu', '--platform', 'manylinux1_x86_64']
Collecting apache-beam==2.5.0
  Using cached
  Saved c:\users\<...>~1\appdata\local\temp\tmpakq8bs\apache_beam-2.5.0-cp27-cp27mu-manylinux1_x86_64.whl
Successfully downloaded apache-beam
INFO:root:Staging binary distribution of the SDK from PyPI to gs://<...>-1120074505-586000.1542699905.588000/apache_beam-2.5.0-cp27-cp27mu-manylinux1_x86_64.whl
INFO:root:Starting GCS upload to gs://<...>-1120074505-586000.1542699905.588000/apache_beam-2.5.0-cp27-cp27mu-manylinux1_x86_64.whl...
INFO:root:Completed GCS upload to gs://<...>-1120074505-586000.1542699905.588000/apache_beam-2.5.0-cp27-cp27mu-manylinux1_x86_64.whl
INFO:root:Create job: <Job
 createTime: u'2018-11-20T07:45:28.050865Z'
 currentStateTime: u'1970-01-01T00:00:00Z'
 id: u'2018-11-19_23_45_27-14221834310382472741'
 location: u'europe-west1'
 name: u'beamapp-<...>-1120074505-586000'
 projectId: u'poc-cloud-209212'
 stageStates: []
 steps: []
 tempFiles: []
 type: TypeValueValuesEnum(JOB_TYPE_STREAMING, 2)>

Ответы [ 2 ]

0 голосов
/ 23 ноября 2018

Импорт внутри функции +

class GeoDataIngestion:
    def parse_method(self, string_input):
        from shapely.geometry import Point
        place = Point(float(values[2]), float(values[3])) с:

0 голосов
/ 19 ноября 2018

Это потому, что вам нужно указать поток данных для установки желаемого пакета.

Вкратце документация здесь .

Проще говоря, для пакета PyPi, например, shapely, вы можете сделать следующее, чтобы убедиться, что все зависимости установлены.

  1. pip freeze > requirements.txt
  2. Удалить все несвязанные пакеты в requirements.txt
  3. Запустите свою линию с --requirements_file requirements.txt

Или даже больше, если вы хотите сделать что-то вроде установки пакета linux с помощью apt-get или с помощью собственного модуля python. Взгляните на этот официальный пример . Для этого вам нужно настроить и изменить команду конвейера с помощью --setup_file

Для модуля PyPi используйте REQUIRED_PACKAGES в качестве примера.


Если вы используете параметры конвейера, добавьте как

pipeline_options = {
        'project': PROJECT,
        'staging_location': 'gs://' + BUCKET + '/staging',
        'runner': 'DataflowRunner',
        'job_name': 'test',
        'temp_location': 'gs://' + BUCKET + '/temp',
        'save_main_session': True,
options = PipelineOptions.from_dictionary(pipeline_options)
p = beam.Pipeline(options=options)