Я хочу использовать GCSToSFTPOperator
в моей среде GCP composer, у нас есть версия ariflow 1.10.3
, composer-1.8.3-airflow-1.10.3
(у меня версия с 1.10.2 до 1.10.3) в среде GCP composer. GCSToSFTPOperator
присутствует в последнем выпуске Airflow. См. Ниже ссылку - https://airflow.readthedocs.io/en/latest/_modules/airflow/operators/gcs_to_sftp.html
Я также пытался с плагином, я скопировал исходный код класса GCSToSFTPOperator
в папку плагинов, затем импортировал в мой python DAG, теперь также я получаю ошибка для airflow.gcp
, после чего я попытался установить пакет gcp 0.2.1
pypi в среде composer, где также не удалось установить ошибку.
Шаг 1 - Создание кода DAG, который находится в папке DAG
import os
from airflow import DAG
from airflow import models
from PluginGCSToSFTPOperator import GCSToSFTPOperator
from airflow.utils.dates import days_ago
default_args = {"start_date": days_ago(1)}
BUCKET_SRC = "bucket-name"
OBJECT_SRC = "parent-1.bin"
DESTINATION_PATH = "/tmp/single-file/"
with models.DAG(
"example_gcs_to_sftp", default_args=default_args, schedule_interval=None,
tags=['example']
) as dag:
copy_file_from_gcs_to_sftp = GCSToSFTPOperator(
task_id="file-copy-gsc-to-sftp",
source_bucket=BUCKET_SRC,
source_object=OBJECT_SRC,
destination_path=DESTINATION_PATH,
)
copy_file_from_gcs_to_sftp
Шаг 2 - скопировал код класса GCSToSFTPOperator и вставил в один файл python, тот же файл помещен в папку плагина.
import os
from tempfile import NamedTemporaryFile
from typing import Optional
#from airflow.plugins_manager import AirflowPlugin
from airflow import AirflowException
from airflow.gcp.hooks.gcs import GCSHook
from airflow.models import BaseOperator
from airflow.providers.sftp.hooks.sftp_hook import SFTPHook
from airflow.utils.decorators import apply_defaults
WILDCARD = "*"
class GCSToSFTPOperator(BaseOperator):
template_fields = ("source_bucket", "source_object", "destination_path")
ui_color = "#f0eee4"
# pylint: disable=too-many-arguments
@apply_defaults
def __init__(
self,
source_bucket: str,
source_object: str,
destination_path: str,
move_object: bool = False,
gcp_conn_id: str = "google_cloud_default",
sftp_conn_id: str = "ssh_default",
delegate_to: Optional[str] = None,
*args,
**kwargs
) -> None:
super().__init__(*args, **kwargs)
self.source_bucket = source_bucket
self.source_object = source_object
self.destination_path = destination_path
self.move_object = move_object
self.gcp_conn_id = gcp_conn_id
self.sftp_conn_id = sftp_conn_id
self.delegate_to = delegate_to
self.sftp_dirs = None
def execute(self, context):
gcs_hook = GCSHook(
gcp_conn_id=self.gcp_conn_id, delegate_to=self.delegate_to
)
sftp_hook = SFTPHook(self.sftp_conn_id)
if WILDCARD in self.source_object:
total_wildcards = self.source_object.count(WILDCARD)
if total_wildcards > 1:
raise AirflowException(
"Only one wildcard '*' is allowed in source_object parameter. "
"Found {} in {}.".format(total_wildcards, self.source_object)
)
prefix, delimiter = self.source_object.split(WILDCARD, 1)
objects = gcs_hook.list(
self.source_bucket, prefix=prefix, delimiter=delimiter
)
for source_object in objects:
destination_path = os.path.join(self.destination_path, source_object)
self._copy_single_object(
gcs_hook, sftp_hook, source_object, destination_path
)
self.log.info(
"Done. Uploaded '%d' files to %s", len(objects), self.destination_path
)
else:
destination_path = os.path.join(self.destination_path, self.source_object)
self._copy_single_object(
gcs_hook, sftp_hook, self.source_object, destination_path
)
self.log.info(
"Done. Uploaded '%s' file to %s", self.source_object, destination_path
)
def _copy_single_object(
self,
gcs_hook: GCSHook,
sftp_hook: SFTPHook,
source_object: str,
destination_path: str,
) -> None:
"""
Helper function to copy single object.
"""
self.log.info(
"Executing copy of gs://%s/%s to %s",
self.source_bucket,
source_object,
destination_path,
)
dir_path = os.path.dirname(destination_path)
sftp_hook.create_directory(dir_path)
with NamedTemporaryFile("w") as tmp:
gcs_hook.download(
bucket_name=self.source_bucket,
object_name=source_object,
filename=tmp.name,
)
sftp_hook.store_file(destination_path, tmp.name)
if self.move_object:
self.log.info(
"Executing delete of gs://%s/%s", self.source_bucket, source_object
)
gcs_hook.delete(self.source_bucket, source_object)
Шаг 3 - Я попытался поместить тот же файл в папку DAG также, после этого также появляется та же ошибка «Нет модуля с именем« airflow.gcp »»
Теперь, что я могу попробовать? Есть ли альтернативный оператор или у нас есть какой-то другой способ использовать этот GCSToSFTPOperator
в версии airflow 1.10.3 ??