Поток данных Apache Beam / GCP, запущенный из AINotebook / Jupyter - PullRequest
0 голосов
/ 07 ноября 2019

Недавно мы перенесли нашу инфраструктуру в GCP, и мы стремимся использовать DataProc (Spark) и DataFlow (Apache Beam) для наших конвейеров данных. Dataproc довольно прост, чтобы заставить его работать, но запуск Dataflow доставляет нам головную боль:

Как мы можем запустить задание Dataflow из JupyterNotebook (например, блокнота AI)

Пример следующий: у меня есть огромный набор данных, который я хочу grou_by, затем сделать фильтр и некоторые вычисления, затем он должен записать объект в определенное ведро (сейчас этот код, я не знаю, как, удаляет корзину, вместо того, чтобы делать что-то полезное)

import datetime, os

def preprocess(in_test_mode):
    import shutil, os, subprocess
    job_name = 'hola'

    if in_test_mode:
        print('Launching local job ... hang on')
        OUTPUT_DIR = './preproc'
        shutil.rmtree(OUTPUT_DIR, ignore_errors=True)
        os.makedirs(OUTPUT_DIR)
    else:
        print('Launching Dataflow job {} ... hang on'.format(job_name))
        OUTPUT_DIR = 'gs://experimentos-con-humanos/'.format(BUCKET)
        try:
            subprocess.check_call('gsutil -m rm -r {}'.format(OUTPUT_DIR).split())
        except:
            pass

    options = {
      'staging_location': os.path.join(OUTPUT_DIR, 'temp'),
      'temp_location': os.path.join(OUTPUT_DIR, 'temp'),
      'job_name': job_name,
      'project': PROJECT,
      'region': REGION,
      'teardown_policy': 'TEARDOWN_ALWAYS',
      'no_save_main_session': True,
      'max_num_workers': 6
    }
    opts = beam.pipeline.PipelineOptions(flags = [], **options)

    if in_test_mode:
        RUNNER = 'DataflowRunner'
    else:
        RUNNER = 'DataflowRunner'

    p = beam.Pipeline(RUNNER, options = opts)
    (p 
         | 'ReadTable' >> beam.io.Read(beam.io.BigQuerySource(table_spec))    
         | 'hashAsKey' >> beam.Map(lambda r: (r['afi_hash'], r))
         | 'Transpose' >> beam.GroupByKey()
         | 'Filtro menos de 12' >> beam.Filter(lambda r: len(r[1]) >= 12 )    
         | 'calculos' >> beam.Map(calculos)
            #| 'Group and sum' >> beam.
            #| 'Format results' >> beam.
         | 'Write results' >> beam.Map(lambda r: print(r))
         | '{}_out'.format(1) >> beam.io.Write(beam.io.WriteToText(os.path.join(OUTPUT_DIR, '{}.csv'.format(1))))
        )

    job = p.run()
    if in_test_mode:
        job.wait_until_finish()
        print("Done!")

preprocess(in_test_mode = False)

1) Это не работает, но работает! 2) Этот код работает, если я изменяю 'DataflowRunner' на 'DirectRunner', это означает, что он работает локально 3) Если я не изменю это, задание не появится в Dataflow, вместо этого удалит корзину GCP, гдеэто работает

PD: у меня есть права администратора для хранения, потока данных и BigQuery PD2: таблица существует, и Bucket у меня есть cuadruple проверка, что у нее есть точное имя PD3: я хотел бызаставить его работать на ноутбуке Jupyter, но это не обязательно, если кто-то задается вопросом

1 Ответ

1 голос
/ 08 ноября 2019

Как сказано в комментариях, проблема, кажется, в части предварительной обработки. В частности, эта часть, которая выполняется по-разному при локальной работе или использовании DataflowRunner:

if in_test_mode:
    print('Launching local job ... hang on')
    OUTPUT_DIR = './preproc'
    shutil.rmtree(OUTPUT_DIR, ignore_errors=True)
    os.makedirs(OUTPUT_DIR)
else:
    print('Launching Dataflow job {} ... hang on'.format(job_name))
    OUTPUT_DIR = 'gs://experimentos-con-humanos/'.format(BUCKET)
    try:
        subprocess.check_call('gsutil -m rm -r {}'.format(OUTPUT_DIR).split())
    except:
        pass

, по-видимому, отвечает за удаление содержимого сегмента (которое используется для вывода, временных файлов и т. Д.),Также обратите внимание, что в этом примере вы на самом деле не добавляете BUCKET к OUTPUT_DIR.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...