Триггер даг с параметрами - PullRequest
0 голосов
/ 10 января 2020

Я использую Composer для процесса ETL. Я хочу вызвать dag с виртуальной машины в Google Cloud и использовать результат процесса, выполняющегося на виртуальной машине, в качестве параметра для моего dag.

Я следую официальному руководству Google о том, как это сделать:

https://cloud.google.com/composer/docs/how-to/using/triggering-with-gcf#trigger_response_dagpy

Где они указывают, что dag_run.conf будет содержать информацию о функции облака. Я запускаю dag, отправляя ненулевое значение для параметра data и нулевое значение для параметра context (как я запускаю с виртуальной машины, поэтому нет контекста) следующим образом:

trigger_dag({u"process_result":u"value_of_process_result"})

Даг срабатывает, но свойство dag_run.conf равно None как dag_run.conf, в отличие от предоставленного ими примера.

Dag Trigger echo results

Функция trigger_dag просто создает запрос к приложению, защищенному с помощью Identity-Aware Proxy.

Есть ли что-нибудь дополнительное, что необходимо сделать, чтобы пробел в потоке воздуха захватывал полезную нагрузку из почтовый запрос при создании свойства dag_run.conf?

В конце я прилагаю код (также можно найти по ссылке, указанной выше):

Код для запуска DAG

import google.auth
import google.auth.compute_engine.credentials
import google.auth.iam
from google.auth.transport.requests import Request
import google.oauth2.credentials
import google.oauth2.service_account
import requests


IAM_SCOPE = 'https://www.googleapis.com/auth/iam'
OAUTH_TOKEN_URI = 'https://www.googleapis.com/oauth2/v4/token'


def trigger_dag(data, context=None):
    """Makes a POST request to the Composer DAG Trigger API
When called via Google Cloud Functions (GCF),
data and context are Background function parameters.

For more info, refer to
https://cloud.google.com/functions/docs/writing/background#functions_background_parameters-python

To call this function from a Python script, omit the ``context`` argument
and pass in a non-null value for the ``data`` argument.
"""

# Fill in with your Composer info here
# Navigate to your webserver's login page and get this from the URL
# Or use the script found at
# https://github.com/GoogleCloudPlatform/python-docs-samples/blob/master/composer/rest/get_client_id.py
client_id = 'YOUR-CLIENT-ID'
# This should be part of your webserver's URL:
# {tenant-project-id}.appspot.com
webserver_id = 'YOUR-TENANT-PROJECT'
# The name of the DAG you wish to trigger
dag_name = 'composer_sample_trigger_response_dag'
webserver_url = (
    'https://'
    + webserver_id
    + '.appspot.com/api/experimental/dags/'
    + dag_name
    + '/dag_runs'
)
# Make a POST request to IAP which then Triggers the DAG
make_iap_request(webserver_url, client_id, method='POST', json=data)


# This code is copied from
# https://github.com/GoogleCloudPlatform/python-docs-samples/blob/master/iap/make_iap_request.py
# START COPIED IAP CODE
def make_iap_request(url, client_id, method='GET', **kwargs):
    """Makes a request to an application protected by Identity-Aware Proxy.
Args:
  url: The Identity-Aware Proxy-protected URL to fetch.
  client_id: The client ID used by Identity-Aware Proxy.
  method: The request method to use
          ('GET', 'OPTIONS', 'HEAD', 'POST', 'PUT', 'PATCH', 'DELETE')
  **kwargs: Any of the parameters defined for the request function:
            https://github.com/requests/requests/blob/master/requests/api.py
            If no timeout is provided, it is set to 90 by default.

Returns:
  The page body, or raises an exception if the page couldn't be retrieved.
"""
# Set the default timeout, if missing
if 'timeout' not in kwargs:
    kwargs['timeout'] = 90

# Figure out what environment we're running in and get some preliminary
# information about the service account.
bootstrap_credentials, _ = google.auth.default(
    scopes=[IAM_SCOPE])

# For service account's using the Compute Engine metadata service,
# service_account_email isn't available until refresh is called.
bootstrap_credentials.refresh(Request())

signer_email = bootstrap_credentials.service_account_email
if isinstance(bootstrap_credentials,
              google.auth.compute_engine.credentials.Credentials):
    # Since the Compute Engine metadata service doesn't expose the service
    # account key, we use the IAM signBlob API to sign instead.
    # In order for this to work:
    # 1. Your VM needs the https://www.googleapis.com/auth/iam scope.
    #    You can specify this specific scope when creating a VM
    #    through the API or gcloud. When using Cloud Console,
    #    you'll need to specify the "full access to all Cloud APIs"
    #    scope. A VM's scopes can only be specified at creation time.
    # 2. The VM's default service account needs the "Service Account Actor"
    #    role. This can be found under the "Project" category in Cloud
    #    Console, or roles/iam.serviceAccountActor in gcloud.
    signer = google.auth.iam.Signer(
        Request(), bootstrap_credentials, signer_email)
else:
    # A Signer object can sign a JWT using the service account's key.
    signer = bootstrap_credentials.signer

# Construct OAuth 2.0 service account credentials using the signer
# and email acquired from the bootstrap credentials.
service_account_credentials = google.oauth2.service_account.Credentials(
    signer, signer_email, token_uri=OAUTH_TOKEN_URI, additional_claims={
        'target_audience': client_id
    })
# service_account_credentials gives us a JWT signed by the service
# account. Next, we use that to obtain an OpenID Connect token,
# which is a JWT signed by Google.
google_open_id_connect_token = get_google_open_id_connect_token(
    service_account_credentials)

# Fetch the Identity-Aware Proxy-protected URL, including an
# Authorization header containing "Bearer " followed by a
# Google-issued OpenID Connect token for the service account.
resp = requests.request(
    method, url,
    headers={'Authorization': 'Bearer {}'.format(
        google_open_id_connect_token)}, **kwargs)
if resp.status_code == 403:
    raise Exception('Service account {} does not have permission to '
                    'access the IAP-protected application.'.format(
                        signer_email))
elif resp.status_code != 200:
    raise Exception(
        'Bad response from application: {!r} / {!r} / {!r}'.format(
            resp.status_code, resp.headers, resp.text))
else:
    return resp.text


def get_google_open_id_connect_token(service_account_credentials):
    """Get an OpenID Connect token issued by Google for the service account.



   This function:
  1. Generates a JWT signed with the service account's private key
     containing a special "target_audience" claim.

  2. Sends it to the OAUTH_TOKEN_URI endpoint. Because the JWT in #1
     has a target_audience claim, that endpoint will respond with
     an OpenID Connect token for the service account -- in other words,
     a JWT signed by *Google*. The aud claim in this JWT will be
     set to the value from the target_audience claim in #1.

For more information, see
https://developers.google.com/identity/protocols/OAuth2ServiceAccount .
The HTTP/REST example on that page describes the JWT structure and
demonstrates how to call the token endpoint. (The example on that page
shows how to get an OAuth2 access token; this code is using a
modified version of it to get an OpenID Connect token.)
"""

service_account_jwt = (
    service_account_credentials._make_authorization_grant_assertion())
request = google.auth.transport.requests.Request()
body = {
    'assertion': service_account_jwt,
    'grant_type': google.oauth2._client._JWT_GRANT_TYPE,
}
token_response = google.oauth2._client._token_endpoint_request(
    request, OAUTH_TOKEN_URI, body)
return token_response['id_token']
# END COPIED IAP CODE

DAG, который печатает результат

import datetime

import airflow
from airflow.operators import bash_operator


default_args = {
    'owner': 'Composer Example',
    'depends_on_past': False,
    'email': [''],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'start_date': datetime.datetime(2017, 1, 1),
}

with airflow.DAG(
        'composer_sample_trigger_response_dag',
        default_args=default_args,
        # Not scheduled, trigger only
        schedule_interval=None) as dag:

    # Print the dag_run's configuration, which includes information about the
    # Cloud Storage object change.
    print_gcs_info = bash_operator.BashOperator(
        task_id='print_gcs_info', bash_command='echo {{ dag_run.conf }}')

1 Ответ

1 голос
/ 13 января 2020

Похоже, что пример в документации Google Cloud может быть устаревшим. Конечная точка Airflow ожидает чтения любых данных из свойства conf объекта JSON. Однако функция Python отправляет данные как есть, не заключая их в свойство conf.

Код Python можно изменить, чтобы учесть это, изменив строку

make_iap_request(webserver_url, client_id, method='POST', json=data)

до

make_iap_request(webserver_url, client_id, method='POST', json={'conf': data})

Кроме того, если вы используете версию Composer старше 1.10.2, вам также может потребоваться преобразовать данные в строку перед отправкой; это можно сделать с помощью json.dumps(data).

Я надеюсь, что это решение подойдет вам.

...