Недавно мы перенесли нашу инфраструктуру в GCP, и мы стремимся использовать DataProc (Spark) и DataFlow (Apache Beam) для наших конвейеров данных. Dataproc довольно прост, чтобы заставить его работать, но запуск Dataflow доставляет нам головную боль:
Как мы можем запустить задание Dataflow из JupyterNotebook (например, блокнота AI)
Пример следующий: у меня есть огромный набор данных, который я хочу grou_by, затем сделать фильтр и некоторые вычисления, затем он должен записать объект в определенное ведро (сейчас этот код, я не знаю, как, удаляет корзину, вместо того, чтобы делать что-то полезное)
import datetime, os
def preprocess(in_test_mode):
import shutil, os, subprocess
job_name = 'hola'
if in_test_mode:
print('Launching local job ... hang on')
OUTPUT_DIR = './preproc'
shutil.rmtree(OUTPUT_DIR, ignore_errors=True)
os.makedirs(OUTPUT_DIR)
else:
print('Launching Dataflow job {} ... hang on'.format(job_name))
OUTPUT_DIR = 'gs://experimentos-con-humanos/'.format(BUCKET)
try:
subprocess.check_call('gsutil -m rm -r {}'.format(OUTPUT_DIR).split())
except:
pass
options = {
'staging_location': os.path.join(OUTPUT_DIR, 'temp'),
'temp_location': os.path.join(OUTPUT_DIR, 'temp'),
'job_name': job_name,
'project': PROJECT,
'region': REGION,
'teardown_policy': 'TEARDOWN_ALWAYS',
'no_save_main_session': True,
'max_num_workers': 6
}
opts = beam.pipeline.PipelineOptions(flags = [], **options)
if in_test_mode:
RUNNER = 'DataflowRunner'
else:
RUNNER = 'DataflowRunner'
p = beam.Pipeline(RUNNER, options = opts)
(p
| 'ReadTable' >> beam.io.Read(beam.io.BigQuerySource(table_spec))
| 'hashAsKey' >> beam.Map(lambda r: (r['afi_hash'], r))
| 'Transpose' >> beam.GroupByKey()
| 'Filtro menos de 12' >> beam.Filter(lambda r: len(r[1]) >= 12 )
| 'calculos' >> beam.Map(calculos)
#| 'Group and sum' >> beam.
#| 'Format results' >> beam.
| 'Write results' >> beam.Map(lambda r: print(r))
| '{}_out'.format(1) >> beam.io.Write(beam.io.WriteToText(os.path.join(OUTPUT_DIR, '{}.csv'.format(1))))
)
job = p.run()
if in_test_mode:
job.wait_until_finish()
print("Done!")
preprocess(in_test_mode = False)
1) Это не работает, но работает! 2) Этот код работает, если я изменяю 'DataflowRunner'
на 'DirectRunner'
, это означает, что он работает локально 3) Если я не изменю это, задание не появится в Dataflow, вместо этого удалит корзину GCP, гдеэто работает
PD: у меня есть права администратора для хранения, потока данных и BigQuery PD2: таблица существует, и Bucket у меня есть cuadruple проверка, что у нее есть точное имя PD3: я хотел бызаставить его работать на ноутбуке Jupyter, но это не обязательно, если кто-то задается вопросом