Как мы можем использовать GCSToSFTPOperator в среде GCP composer? - PullRequest
4 голосов
/ 16 января 2020

Я хочу использовать GCSToSFTPOperator в моей среде GCP composer, у нас есть версия ariflow 1.10.3, composer-1.8.3-airflow-1.10.3 (у меня версия с 1.10.2 до 1.10.3) в среде GCP composer. GCSToSFTPOperator присутствует в последнем выпуске Airflow. См. Ниже ссылку - https://airflow.readthedocs.io/en/latest/_modules/airflow/operators/gcs_to_sftp.html

Я также пытался с плагином, я скопировал исходный код класса GCSToSFTPOperator в папку плагинов, затем импортировал в мой python DAG, теперь также я получаю ошибка для airflow.gcp, после чего я попытался установить пакет gcp 0.2.1 pypi в среде composer, где также не удалось установить ошибку.

Шаг 1 - Создание кода DAG, который находится в папке DAG

import os
from airflow import DAG
from airflow import models
from PluginGCSToSFTPOperator import GCSToSFTPOperator
from airflow.utils.dates import days_ago

default_args = {"start_date": days_ago(1)}

BUCKET_SRC = "bucket-name"
OBJECT_SRC = "parent-1.bin"
DESTINATION_PATH = "/tmp/single-file/"


with models.DAG(
    "example_gcs_to_sftp", default_args=default_args, schedule_interval=None, 
    tags=['example']
) as dag:

    copy_file_from_gcs_to_sftp = GCSToSFTPOperator(
        task_id="file-copy-gsc-to-sftp",
        source_bucket=BUCKET_SRC,
        source_object=OBJECT_SRC,
        destination_path=DESTINATION_PATH,
    )

    copy_file_from_gcs_to_sftp

Шаг 2 - скопировал код класса GCSToSFTPOperator и вставил в один файл python, тот же файл помещен в папку плагина.

import os
from tempfile import NamedTemporaryFile
from typing import Optional

#from airflow.plugins_manager import AirflowPlugin
from airflow import AirflowException
from airflow.gcp.hooks.gcs import GCSHook
from airflow.models import BaseOperator
from airflow.providers.sftp.hooks.sftp_hook import SFTPHook
from airflow.utils.decorators import apply_defaults

WILDCARD = "*"

class GCSToSFTPOperator(BaseOperator):

template_fields = ("source_bucket", "source_object", "destination_path")

ui_color = "#f0eee4"


# pylint: disable=too-many-arguments
@apply_defaults
def __init__(
    self,
    source_bucket: str,
    source_object: str,
    destination_path: str,
    move_object: bool = False,
    gcp_conn_id: str = "google_cloud_default",
    sftp_conn_id: str = "ssh_default",
    delegate_to: Optional[str] = None,
    *args,
    **kwargs
) -> None:
    super().__init__(*args, **kwargs)

    self.source_bucket = source_bucket
    self.source_object = source_object
    self.destination_path = destination_path
    self.move_object = move_object
    self.gcp_conn_id = gcp_conn_id
    self.sftp_conn_id = sftp_conn_id
    self.delegate_to = delegate_to
    self.sftp_dirs = None

def execute(self, context):
    gcs_hook = GCSHook(
        gcp_conn_id=self.gcp_conn_id, delegate_to=self.delegate_to
    )

    sftp_hook = SFTPHook(self.sftp_conn_id)

    if WILDCARD in self.source_object:
        total_wildcards = self.source_object.count(WILDCARD)
        if total_wildcards > 1:
            raise AirflowException(
                "Only one wildcard '*' is allowed in source_object parameter. "
                "Found {} in {}.".format(total_wildcards, self.source_object)
            )

        prefix, delimiter = self.source_object.split(WILDCARD, 1)
        objects = gcs_hook.list(
            self.source_bucket, prefix=prefix, delimiter=delimiter
        )

        for source_object in objects:
            destination_path = os.path.join(self.destination_path, source_object)
            self._copy_single_object(
                gcs_hook, sftp_hook, source_object, destination_path
            )

        self.log.info(
            "Done. Uploaded '%d' files to %s", len(objects), self.destination_path
        )
    else:
        destination_path = os.path.join(self.destination_path, self.source_object)
        self._copy_single_object(
            gcs_hook, sftp_hook, self.source_object, destination_path
        )
        self.log.info(
            "Done. Uploaded '%s' file to %s", self.source_object, destination_path

        )

def _copy_single_object(
    self,
    gcs_hook: GCSHook,
    sftp_hook: SFTPHook,
    source_object: str,
    destination_path: str,
) -> None:
    """
    Helper function to copy single object.
    """
    self.log.info(
        "Executing copy of gs://%s/%s to %s",
        self.source_bucket,
        source_object,
        destination_path,
    )

    dir_path = os.path.dirname(destination_path)
    sftp_hook.create_directory(dir_path)

    with NamedTemporaryFile("w") as tmp:
        gcs_hook.download(
            bucket_name=self.source_bucket,
            object_name=source_object,
            filename=tmp.name,
        )
        sftp_hook.store_file(destination_path, tmp.name)

    if self.move_object:
        self.log.info(
            "Executing delete of gs://%s/%s", self.source_bucket, source_object
        )
        gcs_hook.delete(self.source_bucket, source_object)

Шаг 3 - Я попытался поместить тот же файл в папку DAG также, после этого также появляется та же ошибка «Нет модуля с именем« airflow.gcp »»

Теперь, что я могу попробовать? Есть ли альтернативный оператор или у нас есть какой-то другой способ использовать этот GCSToSFTPOperator в версии airflow 1.10.3 ??

1 Ответ

0 голосов
/ 17 января 2020

Документация, которую вы искали, - версия Airflow 1.10.7, самая последняя. Когда вы обратитесь к документации 1.10.2 Airflow , вы увидите, что в этой версии отсутствует оператор gcs_to_sftp.

Можно попробовать скопировать код , создайте плагин и поместите код в каталог плагинов в корзине экземпляров Composer. Если у вас все еще есть проблема, пожалуйста, укажите все шаги, которые вы уже предприняли, и я постараюсь вам помочь.

Вы также можете прочитать больше о обновлении версии Airflow в Composer.

...