Отладка пользовательского датчика воздушного потока с помощью Pytest + mock - PullRequest
0 голосов
/ 29 октября 2018

Я написал пользовательский датчик потока воздуха, который ожидает, чтобы файл в s3 был изменен позднее, чем дата выполнения dag. Класс:

from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults

class S3FileModifiedAtSensor(BaseSensorOperator):

@apply_defaults
def __init__(self,
             job_name_in_s3,
             bucket_name,
             *args,
             **kwargs):
    super(S3FileModifiedAtSensor, self).__init__(*args, **kwargs)
    self.job_name_in_s3 = job_name_in_s3
    self.bucket_name = bucket_name

def poke(self, context):

    self.last_modified = self.get_last_modified()

    dag = context['dag']
    exec_date = dag.following_schedule(context['execution_date'])

    print("last modified = " + str(self.last_modified.__class__))

    return self.last_modified > exec_date

def get_last_modified(self):

    S3_CONN_ID = 's3_conn'

    from airflow.hooks.S3_hook import S3Hook
    hook = S3Hook(S3_CONN_ID)
    key = hook.get_key(self.job_name_in_s3, self.bucket_name)
    return key.last_modified

Я пытался написать тест, используя pytest + pytest-mock, но этот тест не пройдет. Я попытался напечатать last_modified, как показано выше, но класс - MagicMock. Вот тест;

from datetime import datetime, timedelta
from airflow import DAG

def test_poke(mocker):
test_dag = DAG(
    'test-dags',
    catchup=False,
    start_date=datetime(2017, 9, 18, 2, 0, 0),
    schedule_interval=timedelta(minutes=15)
)
mock_sensor = mocker.patch('sensors.trigger.s3_modified_at_sensor.S3FileModifiedAtSensor')

mock_sensor.return_value.get_last_modified = datetime(2018, 7, 18, 2, 0, 0)

context = {'dag': test_dag, 'execution_date': datetime(2018, 9, 18, 2, 0, 0)}

assert (not mock_sensor.poke(context))

Кто-нибудь знает, как мне лучше всего это проверить?

...