Я пытаюсь запустить несколько тестов на созданном пользовательском хуке apache airflow. Тесты пройдены, но есть странная ошибка (которая не влияет на тесты), как показано ниже. Это странно, потому что BrazeHook можно импортировать и использовать, и он не вызывает ошибок ни в каком другом классе или тестах
[2019-06-21 16:11:27,748] {{settings.py:182}} INFO - settings.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800, pid=53175
[2019-06-21 16:11:29,243] {{plugins_manager.py:143}} ERROR - cannot import name 'BrazeHook'
Traceback (most recent call last):
File "/xxxxx/xxx/venv/lib/python3.6/site-packages/airflow/plugins_manager.py", line 137, in <module>
m = imp.load_source(namespace, filepath)
File "/xxxxx/bloodflow/venv/lib/python3.6/imp.py", line 172, in load_source
module = _load(spec)
File "<frozen importlib._bootstrap>", line 684, in _load
File "<frozen importlib._bootstrap>", line 665, in _load_unlocked
File "<frozen importlib._bootstrap_external>", line 678, in exec_module
File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
File "/xxxx/xxx/plugins/operators/s3_to_braze_operator.py", line 9, in <module>
from plugins.hooks.braze_hook import BrazeHook
ImportError: cannot import name 'BrazeHook'
[2019-06-21 16:11:29,246] {{plugins_manager.py:144}} ERROR - Failed to import plugin /xxxx/xxx/plugins/operators/x_to_braze_operator.py
Ran 1 test in 0.071s
OK
Класс
import requests
from braze.client import BrazeClient, BrazeClientError, BrazeInternalServerError
class BrazeImportError(Exception):
def __init__(self, message):
"""
Error occured while trying to import the users data into Braze
:param str message: error message returned by Braze or message out ot anu other exception.
"""
self.message = message
super(BrazeImportError, self).__init__()
class BrazeHook(BaseHook):
def __init__(self, braze_conn_id='braze', *args, **kwargs):
self.connection = self.get_connection(braze_conn_id)
def track(self, attributes=None, events=None, purchases=None):
"""
adds/modifies user data through /users/track endpoint of braze
:param attributes: dict or list of user attributes dict (external_id, ... }
:return:
:throws BrazeClientError encapsulation various errors returned by Braze
"""
try:
client = BrazeClient(api_url=self.connection.host, api_key=self.connection.extra_dejson.get('api_key'))
return client.user_track(attributes=attributes, events=events, purchases=purchases)
except requests.exceptions.ConnectionError as ce:
raise BrazeImportError(ce.args[0])
except (BrazeClientError, BrazeInternalServerError) as be:
raise BrazeImportError(message=str(be))
Тест
class TestBrazeHook(unittest.TestCase):
@responses.activate
def test_request_path(self):
responses.add(responses.POST, "https://brazeurl/users/track",
json={'errors': u'', u'message': u'success', 'status_code': 200, 'success': True}, status=201)
handler = BrazeHook(braze_conn_id="braze")
response = handler.track(attributes=[])
self.assertEqual(201, response['status_code'])
UPDATE
добавление оператора, на который ссылается сообщение об ошибке, в соответствии с запросом в комментариях
class S3ToBrazeOperator(BaseOperator):
"""
Copies Reader Score based audiences from S3 to Braze
"""
template_fields = ()
template_ext = ()
ui_color = '#ededed'
@apply_defaults
def __init__(self,
s3_bucket,
s3_path,
s3_conn_id,
braze_conn_id,
columns,
*args, **kwargs):
super(S3ToBrazeOperator, self).__init__(*args, **kwargs)
self.s3_bucket = s3_bucket
self.s3_path = s3_path
self.s3_conn_id = s3_conn_id
self.braze_conn_id = braze_conn_id
self.columns = columns
def execute(self, context):
request_handler = BrazeHook(braze_conn_id=self.braze_conn_id)
....
scores = dd.read_csv(f"s3://{self.s3_bucket}/{self.s3_path}/*",
sep=";", compression="gzip", header=None, storage_options=options).compute()
scores.columns = self.columns
attributes = []
# batches the results according to step size
for batch in chunk(scores, BRAZE_API_STEP):
batch.apply(lambda user: {
"external_id": user.user_id,
....
}, axis=1).apply(lambda attribute: attributes.append(attribute))
try:
request_handler.track(attributes=attributes)
except Exception as e:
logging.error("Error {}".format(str(e)))
raise