Как видно, опция job_name
в DataFlowJavaOperator
переопределяется на task_id
.Имя задания будет иметь задачу в качестве префикса и добавлять суффикс произвольного идентификатора.Если вы все еще хотите иметь имя задания Dataflow, которое на самом деле отличается от идентификатора задачи, вы можете добавить его в Java-коде Dataflow:
options.setJobName("jobNameInCode")
Затем, используя PythonOperator
, вы можете получитьидентификатор задания из префикса (либо имя задания, указанное в коде, либо иначе идентификатор задания Composer), как я объяснил здесь .Вкратце, перечислите задания с помощью:
result = dataflow.projects().locations().jobs().list(
projectId=project,
location=location,
).execute()
, а затем отфильтруйте по префиксу, где job_prefix
- это job_name
, определенный при запуске задания:
for job in result['jobs']:
if re.findall(r'' + re.escape(job_prefix) + '', job['name']):
job_id = job['id']
break
Оператор break доступен дляубедитесь, что мы получили только последнюю работу с таким именем, которое должно быть только что запущенным.