Как запустить задание Dataflow из Datalab в Python? - PullRequest
0 голосов
/ 11 января 2019

У меня проблемы с запуском задания Dataflow из Datalab. То, что я мог сделать, - это минимально работающий пример кода Python для этой ситуации, так как он, по-видимому, недоступен в документации по Google Cloud Platform или Apache Beam.

Было бы очень полезно увидеть код Python, который я могу запустить из ячейки Datalab, которая выполняет следующие действия.

# 1. Sets up the job

# 2. Defines the processing logic to be applied to the input data files

# 3. Saves the processed files to an output folder

# 4. Submits the job to Google Cloud Dataflow

Чтобы решить эту проблему, я попытался поиграть с примерами подсчета слов из документов Google и Apache и адаптировать их для использования в Datalab. Код для этого следующий, но мне не ясно, какие биты я могу вырезать, чтобы превратить его в действительно минимальный рабочий пример.

from __future__ import absolute_import
import argparse
import logging
import re
from past.builtins import unicode
import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions

def run(argv=None):
  """Main entry point; defines and runs the wordcount pipeline."""
  parser = argparse.ArgumentParser()
  parser.add_argument('--input',
                      dest='input',
                      default='gs://data-analytics/kinglear.txt',
                      help='Input file to process.')
  parser.add_argument('--output',
                      dest='output',
                      default='gs://data-analytics/output',
                      help='Output file to write results to.')
  known_args, pipeline_args = parser.parse_known_args(argv)
  pipeline_args.extend([
      '--runner=DataflowRunner',
      '--project=project',
      '--staging_location=gs://staging',
      '--temp_location=gs://tmp',
      '--job_name=your-wordcount-job',
  ])

  # We use the save_main_session option because one or more DoFn's in this
  # workflow rely on global context (e.g., a module imported at module level).
  pipeline_options = PipelineOptions(pipeline_args)
  pipeline_options.view_as(SetupOptions).save_main_session = True

  with beam.Pipeline(options=pipeline_options) as p:

    # Read the text file[pattern] into a PCollection.
    lines = p | ReadFromText(known_args.input)

    # Count the occurrences of each word.
    counts = (
        lines
        | 'Split' >> (beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
                  .with_output_types(unicode))
        | 'PairWithOne' >> beam.Map(lambda x: (x, 1))
        | 'GroupAndSum' >> beam.CombinePerKey(sum))

    # Format the counts into a PCollection of strings.
    def format_result(word_count):
      (word, count) = word_count
      return '%s: %s' % (word, count)

    output = counts | 'Format' >> beam.Map(format_result)

    # Write the output using a "Write" transform that has side effects.
    output | WriteToText(known_args.output)

if __name__ == '__main__':
  logging.getLogger().setLevel(logging.INFO)
  run()

Заранее спасибо!

Josh

Ответы [ 2 ]

0 голосов
/ 15 января 2019

Я решил эту проблему с помощью руководств здесь: https://github.com/hayatoy/dataflow-tutorial и теперь могу запустить задание потока данных из Datalab со следующим кодом.

import apache_beam as beam

# Pipeline options:
options                         = beam.options.pipeline_options.PipelineOptions()
gcloud_options                  = options.view_as(beam.options.pipeline_options.GoogleCloudOptions)
gcloud_options.job_name         = 'test'
gcloud_options.project          = 'project'
gcloud_options.staging_location = 'gs://staging'
gcloud_options.temp_location    = 'gs://tmp'
gcloud_options.region           = 'europe-west2'

# Worker options:
worker_options                  = options.view_as(beam.options.pipeline_options.WorkerOptions)
worker_options.disk_size_gb     = 30
worker_options.max_num_workers  = 10

# Standard options:
options.view_as(beam.options.pipeline_options.StandardOptions).runner = 'DataflowRunner'

# Pipeline:
PL = beam.Pipeline(options=options)

(
      PL | 'read'  >> beam.io.ReadFromText('gs://input.txt')
         | 'write' >> beam.io.WriteToText ('gs://output.txt', num_shards=1)
)

PL.run()

Спасибо

Josh

0 голосов
/ 11 января 2019

Я думаю, что вы путаете то, что Datalab делает с тем, что делает Dataflow. Это две разные платформы программирования, и вы смешиваете все вместе. Ваш комментарий: Defines the processing logic to be applied to the input data files. Логика обработки - это то, что обеспечивает исходный код (или шаблоны) для Cloud Dataflow, а не код, выполняемый в записных книжках Cloud Datalab.

Как вариант: Если вы устанавливаете библиотеки Cloud Dataflow и используете Python 2.x, вы можете написать программное обеспечение Cloud Dataflow (Apache Beam) в записных книжках Datalab. Этот код будет запускаться локально внутри Datalab и не будет запускать задание потока данных.

Вот несколько ссылок, которые помогут вам написать программное обеспечение, которое будет создавать задания Cloud Dataflow.

Вот ответ StackOverflow, который покажет вам, как запустить задание Dataflow в python:

https://stackoverflow.com/a/52405696/8016720

Документация Google Dataflow для Java, но с подробными объяснениями необходимых шагов:

Метод: projects.jobs.list

Это ссылка на API-интерфейс клиента потока данных:

API клиента потока данных

...