plugins_manager.py ОШИБКА - Не удалось импортировать плагин - PullRequest
1 голос
/ 21 июня 2019

Я пытаюсь запустить несколько тестов на созданном пользовательском хуке apache airflow. Тесты пройдены, но есть странная ошибка (которая не влияет на тесты), как показано ниже. Это странно, потому что BrazeHook можно импортировать и использовать, и он не вызывает ошибок ни в каком другом классе или тестах

[2019-06-21 16:11:27,748] {{settings.py:182}} INFO - settings.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800, pid=53175
[2019-06-21 16:11:29,243] {{plugins_manager.py:143}} ERROR - cannot import name 'BrazeHook'
Traceback (most recent call last):
  File "/xxxxx/xxx/venv/lib/python3.6/site-packages/airflow/plugins_manager.py", line 137, in <module>
    m = imp.load_source(namespace, filepath)
  File "/xxxxx/bloodflow/venv/lib/python3.6/imp.py", line 172, in load_source
    module = _load(spec)
  File "<frozen importlib._bootstrap>", line 684, in _load
  File "<frozen importlib._bootstrap>", line 665, in _load_unlocked
  File "<frozen importlib._bootstrap_external>", line 678, in exec_module
  File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
  File "/xxxx/xxx/plugins/operators/s3_to_braze_operator.py", line 9, in <module>
    from plugins.hooks.braze_hook import BrazeHook
ImportError: cannot import name 'BrazeHook'
[2019-06-21 16:11:29,246] {{plugins_manager.py:144}} ERROR - Failed to import plugin /xxxx/xxx/plugins/operators/x_to_braze_operator.py

Ran 1 test in 0.071s

OK

Класс

import requests

from braze.client import BrazeClient, BrazeClientError, BrazeInternalServerError


class BrazeImportError(Exception):
    def __init__(self, message):
        """
        Error occured while trying to import the users data into Braze

        :param str message: error message returned by Braze or message out ot anu other exception.
        """
        self.message = message
        super(BrazeImportError, self).__init__()


class BrazeHook(BaseHook):

    def __init__(self, braze_conn_id='braze', *args, **kwargs):
        self.connection = self.get_connection(braze_conn_id)

    def track(self, attributes=None, events=None, purchases=None):
        """
        adds/modifies user data through /users/track endpoint of braze
        :param attributes: dict or list of user attributes dict (external_id, ... }
        :return:
        :throws BrazeClientError encapsulation various errors returned by Braze
        """
        try:
            client = BrazeClient(api_url=self.connection.host, api_key=self.connection.extra_dejson.get('api_key'))
            return client.user_track(attributes=attributes, events=events, purchases=purchases)
        except requests.exceptions.ConnectionError as ce:
            raise BrazeImportError(ce.args[0])
        except (BrazeClientError, BrazeInternalServerError) as be:
            raise BrazeImportError(message=str(be))

Тест

class TestBrazeHook(unittest.TestCase):

    @responses.activate
    def test_request_path(self):
        responses.add(responses.POST, "https://brazeurl/users/track",
                      json={'errors': u'', u'message': u'success', 'status_code': 200, 'success': True}, status=201)

        handler = BrazeHook(braze_conn_id="braze")

        response = handler.track(attributes=[])

        self.assertEqual(201, response['status_code'])

UPDATE добавление оператора, на который ссылается сообщение об ошибке, в соответствии с запросом в комментариях

class S3ToBrazeOperator(BaseOperator):
    """
    Copies Reader Score based audiences from S3 to Braze
    """

    template_fields = ()
    template_ext = ()
    ui_color = '#ededed'

    @apply_defaults
    def __init__(self,
                 s3_bucket,
                 s3_path,
                 s3_conn_id,
                 braze_conn_id,
                 columns,
                 *args, **kwargs):
        super(S3ToBrazeOperator, self).__init__(*args, **kwargs)
        self.s3_bucket = s3_bucket
        self.s3_path = s3_path
        self.s3_conn_id = s3_conn_id
        self.braze_conn_id = braze_conn_id
        self.columns = columns

    def execute(self, context):
        request_handler = BrazeHook(braze_conn_id=self.braze_conn_id)

        ....

        scores = dd.read_csv(f"s3://{self.s3_bucket}/{self.s3_path}/*",
                             sep=";", compression="gzip", header=None, storage_options=options).compute()

        scores.columns = self.columns

        attributes = []
        # batches the results according to step size
        for batch in chunk(scores, BRAZE_API_STEP):

            batch.apply(lambda user: {
                "external_id": user.user_id,
                ....
            }, axis=1).apply(lambda attribute: attributes.append(attribute))

            try:
                request_handler.track(attributes=attributes)
            except Exception as e:
                logging.error("Error {}".format(str(e)))
                raise

1 Ответ

1 голос
/ 21 июня 2019

Одна вещь, которую вы можете попробовать, это поместить файл __init__.py в каталог плагинов со следующим:

from airflow.plugins_manager import AirflowPlugin

from plugins.hooks.braze_hook import BrazeHook
from plugins.operators.x_to_braze_operator import S3ToBrazeOperator


class BrazePlugin(AirflowPlugin):
    name = "braze_plugin"
    operators  = [S3ToBrazeOperator]
    sensors = []
    hooks = [BrazeHook]
    executors = []
    macros = []
    admin_views = []
    flask_blueprints = []
    menu_links = []
    appbuilder_views = []
    appbuilder_menu_items = []

Это может помочь, но я не на 100%.

...