Можно создать worflow (поток воздуха) между s3 (файл csv), чтобы сохранить его в mongodb? - PullRequest
0 голосов
/ 19 декабря 2018

Я хочу создать рабочий процесс, который берет файлы из s3 и сохраняет данные в mongodb, поэтому я пришел с таким подходом:

dag = DAG('s3_to_mongo',
        schedule_interval='@daily',
        catchup=False)


first_task = DummyOperator(task_id='dummy_task', dag=dag)

s3_mongo_task = S3ToMongoOperator(s3_conn_id='', s3_bucket='', s3_key='',
                                mongo_conn_id='', mongo_collection='', mongo_method='insert',
                                mongo_db=None, mongo_replacement_filter=None, upsert=False, dag=dag)

first_task >> s3_mongo_task

Я использую оператор для официальной документации:https://github.com/airflow-plugins/mongo_plugin/blob/master/operators/s3_to_mongo_operator.py

1 Ответ

0 голосов
/ 26 декабря 2018

Airflow и Mongo Plugin не поддерживают CSV-файлы на S3.Вам нужно написать свой собственный оператор.

class S3CsvToMongoOperator(S3ToMongoOperator):
    def __init__(*args, **kwargs):
        super().__init__(*args, **kwargs)

    def execute(self, context):
        s3 = S3Hook(self.s3_conn_id)
        mongo = MongoHook(conn_id=self.mongo_conn_id)

        data = (s3
                .get_key(self.s3_key,
                         bucket_name=self.s3_bucket)
                .get_contents_as_string(encoding='utf-8'))

        lines = data.split('\n')
        docs = [doc for doc in csv.DictReader(lines)]

        self.method_mapper(mongo, docs)
...