Как получить идентификатор задания или результат воздушного потока DataFlowJavaOperator ()? - PullRequest
0 голосов
/ 07 февраля 2019

Я использую DataFlowJavaOperator () в потоке воздуха (Cloud Composer).Есть ли способ получить идентификатор выполненного задания потока данных в следующей задаче PythonOperator?Я хотел бы использовать job_id для вызова команды gcloud для получения результата задания.

def check_dataflow(ds, **kwargs)
  # here I want to execute gloud command with the job ID to get job result.
  # gcloud dataflow jobs describe <JOB_ID>

t1 = DataFlowJavaOperator(
    task_id='task1'
    jar='gs://path/to/jar/abc.jar',
    options={
        'stagingLocation': "gs://stgLocation/",
        'tempLocation': "gs://tmpLocation/",
    },
    provide_context=True
    dag=dag,
 )

t2 = PythonOperator(
    task_id='task2',
    python_callable=check_dataflow,
    provide_context=True
    dag=dag,
)


t1 >> t2

1 Ответ

0 голосов
/ 07 февраля 2019

Как видно, опция job_name в DataFlowJavaOperator переопределяется на task_id.Имя задания будет иметь задачу в качестве префикса и добавлять суффикс произвольного идентификатора.Если вы все еще хотите иметь имя задания Dataflow, которое на самом деле отличается от идентификатора задачи, вы можете добавить его в Java-коде Dataflow:

options.setJobName("jobNameInCode")

Затем, используя PythonOperator, вы можете получитьидентификатор задания из префикса (либо имя задания, указанное в коде, либо иначе идентификатор задания Composer), как я объяснил здесь .Вкратце, перечислите задания с помощью:

result = dataflow.projects().locations().jobs().list(
  projectId=project,
  location=location,
).execute()

, а затем отфильтруйте по префиксу, где job_prefix - это job_name, определенный при запуске задания:

for job in result['jobs']:
  if re.findall(r'' + re.escape(job_prefix) + '', job['name']):
    job_id = job['id']
    break

Оператор break доступен дляубедитесь, что мы получили только последнюю работу с таким именем, которое должно быть только что запущенным.

...