Airflow и Mongo Plugin не поддерживают CSV-файлы на S3.Вам нужно написать свой собственный оператор.
class S3CsvToMongoOperator(S3ToMongoOperator):
def __init__(*args, **kwargs):
super().__init__(*args, **kwargs)
def execute(self, context):
s3 = S3Hook(self.s3_conn_id)
mongo = MongoHook(conn_id=self.mongo_conn_id)
data = (s3
.get_key(self.s3_key,
bucket_name=self.s3_bucket)
.get_contents_as_string(encoding='utf-8'))
lines = data.split('\n')
docs = [doc for doc in csv.DictReader(lines)]
self.method_mapper(mongo, docs)