Воздушный поток: как реализовать Dynami c html_content - PullRequest
0 голосов
/ 16 июня 2020
• 1000 be Dynami c

Пример ниже является одним из тела письма:

  The `filename` has been delivered.  `0 rows` for contact from 2020-06-14. If you have any questions or concerns regarding this feed please reply to this email

    NOTE: The information contained in this email message is considered confidential and proprietary to the sender and is intended solely for review and use by the named recipient. Any unauthorized review, use, or distribution is strictly prohibited. If you have received this message in error, please advise the sender by reply email and delete the message.

Код:

def execute(self, context):

        if self.source_task_ids:
            ti = context['task_instance']
            self.s3_key = ti.xcom_pull(task_ids=self.source_task_ids, key='s3_key')[0]

        self.s3_key = self.get_s3_key(self.s3_key)
        s3_hook = S3Hook(self.s3_conn_id)

        try:
            if not s3_hook.check_for_key(self.s3_key, bucket_name=self.s3_bucket):
                logger.info(f'The source key {self.s3_key} does not exist in the {self.s3_bucket}')
                rowcount = 0
                self.subject  = self.subject
                self.html_content = self.html_content
            else:
                filedata = s3_hook.read_key(self.s3_key, bucket_name=self.s3_bucket)
                rowcount = filedata.count('\n') - 1
                logger.info(f'rowcount: {rowcount}')
                self.subject = self.subject
                self.html_content = self.html_content
            self.snd_mail(self.send_from,self.send_to,self.subject, self.html_content, self.eml_server, files=self.files)
        except Exception as e:
            raise AirflowException(f'Error in sending the Email - {e}')

1 Ответ

0 голосов
/ 16 июня 2020

Поддержка воздушного потока Шаблоны Jinja в операторах. Он встроен в BaseOperator и управляется полями template_fields и template_ext базового оператора, например:

class CustomEmailOperator(BaseOperator):
    template_fields = ("html_content")
    template_ext = (".html",)

    @apply_defaults
    def __init__(self, html_content, ...):
        super().__init__(*args, **kwargs)
        self.html_content = html_content

    def execute(self, context):
      # Rest of operator code, nothing special needs to happen to render the templates

Теперь поле html_content может быть либо путем к файлу шаблона jinja с .html расширение или html строка напрямую. Параметры могут быть переданы в шаблон Jinja с помощью поля params оператора:

task1 = CustomEmailOperator(
    task_id = "task1",
    html_content = "Hello, {{ params.name }}",
    params = {
        "name": "John",
    },
    ...
)

Таким образом вы можете передать имя файла и количество параметров строк. Если вы не хотите полагаться на механизм BaseOperator для создания шаблонов содержимого электронной почты, например, потому что вам нужно немного больше контроля, вы также можете использовать вспомогательную функцию, доступную в Airflow:

from airflow.utils.helpers import parse_template_string

html_content = "Hello, {{ params.name }}"
_, template = parse_template_string(html_content)
body = template.render({"name": "John"})
...