У меня проблемы с запуском задания Dataflow из Datalab. То, что я мог сделать, - это минимально работающий пример кода Python для этой ситуации, так как он, по-видимому, недоступен в документации по Google Cloud Platform или Apache Beam.
Было бы очень полезно увидеть код Python, который я могу запустить из ячейки Datalab, которая выполняет следующие действия.
# 1. Sets up the job
# 2. Defines the processing logic to be applied to the input data files
# 3. Saves the processed files to an output folder
# 4. Submits the job to Google Cloud Dataflow
Чтобы решить эту проблему, я попытался поиграть с примерами подсчета слов из документов Google и Apache и адаптировать их для использования в Datalab. Код для этого следующий, но мне не ясно, какие биты я могу вырезать, чтобы превратить его в действительно минимальный рабочий пример.
from __future__ import absolute_import
import argparse
import logging
import re
from past.builtins import unicode
import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
def run(argv=None):
"""Main entry point; defines and runs the wordcount pipeline."""
parser = argparse.ArgumentParser()
parser.add_argument('--input',
dest='input',
default='gs://data-analytics/kinglear.txt',
help='Input file to process.')
parser.add_argument('--output',
dest='output',
default='gs://data-analytics/output',
help='Output file to write results to.')
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_args.extend([
'--runner=DataflowRunner',
'--project=project',
'--staging_location=gs://staging',
'--temp_location=gs://tmp',
'--job_name=your-wordcount-job',
])
# We use the save_main_session option because one or more DoFn's in this
# workflow rely on global context (e.g., a module imported at module level).
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True
with beam.Pipeline(options=pipeline_options) as p:
# Read the text file[pattern] into a PCollection.
lines = p | ReadFromText(known_args.input)
# Count the occurrences of each word.
counts = (
lines
| 'Split' >> (beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
.with_output_types(unicode))
| 'PairWithOne' >> beam.Map(lambda x: (x, 1))
| 'GroupAndSum' >> beam.CombinePerKey(sum))
# Format the counts into a PCollection of strings.
def format_result(word_count):
(word, count) = word_count
return '%s: %s' % (word, count)
output = counts | 'Format' >> beam.Map(format_result)
# Write the output using a "Write" transform that has side effects.
output | WriteToText(known_args.output)
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
Заранее спасибо!
Josh