Вы можете создать своего собственного оператора, просто скопировав BigQueryOperator и внеся следующие изменения в функции execute
и on_kill
внутри него, или вы также можете переопределить существующий BigQueryOperator
.
def execute(self, context):
if self.bq_cursor is None:
self.log.info( "Beginnging Execution." )
hook = BigQueryHook(
bigquery_conn_id=self.bigquery_conn_id,
use_legacy_sql=self.use_legacy_sql,
delegate_to=self.delegate_to)
conn = hook.get_conn()
self.bq_cursor = conn.cursor()
self.bq_cursor.run_query(
self.sql,
destination_dataset_table=self.destination_dataset_table,
write_disposition=self.write_disposition,
allow_large_results=self.allow_large_results,
flatten_results=self.flatten_results,
udf_config=self.udf_config,
maximum_billing_tier=self.maximum_billing_tier,
maximum_bytes_billed=self.maximum_bytes_billed,
create_disposition=self.create_disposition,
query_params=self.query_params,
labels=self.labels,
schema_update_options=self.schema_update_options,
priority=self.priority,
time_partitioning=self.time_partitioning
)
self.log.info( "Executed: %s" % self.sql )
def on_kill(self):
super(BigQueryOperator, self).on_kill()
self.log.error( "Failed to Execute: %s" % self.sql )
if self.bq_cursor is not None:
self.log.info('Canceling running query due to execution timeout')
self.bq_cursor.cancel_query()
Вы должны добавить собственные операторы в каталог plugins .