Как исправить: код Python не работает через поток данных DAG: файл pandas.read_csv ('gs: //x/y.csv') не существует - PullRequest
1 голос
/ 06 июня 2019

код на моем компьютере работает нормально, но когда я помещаю его в DAG для запуска через Airflow, он не работает.Я использую GCP и композитор.Другие задачи отлично работают в том же облачном хранилище.Композитор имеет все необходимые разрешения.

def get_results():
    import pandas as pandas
    df = pandas.read_csv('gs://y/x.csv')

with models.DAG(
        ...) as dag:
     search_simmilar = python_operator.PythonOperator(
         task_id='search',
         python_callable=get_results
     )

Ошибка в журналах воздушного потока:

File "pandas/_libs/parsers.pyx", line 695, in pandas._libs.parsers.TextReader._setup_parser_sourc
FileNotFoundError: File b'gs://y/x.csv' does not exis

Ответы [ 3 ]

1 голос
/ 17 июня 2019

Проверьте версию панд, установленную в вашем композиторе. Версия для панд 0.24.0 Добавлена ​​поддержка чтения из / записи в Google Cloud Storage через библиотеку gcsfs

ПРИМЕЧАНИЕ: обновление версии панд может сломать существующих операторов, так как хуки будут использовать или зависит от более старой версии панд. Например. BigQueryOperator завершится ошибкой из-за зависимости панд. В этом случае вы можете использовать PythonVirtualenvOperator или KubernetesPodOperator, где вы сможете установить зависимые модули (например, pandas> 0.24.0), не затрагивая существующую среду.

0 голосов
/ 07 июня 2019

GCP composer использует Cloud Storage FUSE, который сопоставляет папку dag вашего композитора с корзиной Google cloud storage, в которую вы помещаете свои группы доступности баз данных (например: gs://bucket-name/dags).

Я советую размещать файлы, которыеобщий для дагов в этой папке /home/airflow/gcs/data, который сопоставлен с gs://bucket-name/dags.Здесь вы можете прочитать больше о Google cloud storage и Composer: https://cloud.google.com/composer/docs/concepts/cloud-storage

Также вот пример:

import os
import pandas as pandas

def get_results():
    path_to_csv = os.path.join('/home/airflow/gcs/data', 'y','x.csv') 
    df = pandas.read_csv(path_to_csv, header=None)

with models.DAG(
        ...) as dag:
     search_simmilar = python_operator.PythonOperator(
         task_id='search',
         python_callable=get_results
     )
0 голосов
/ 06 июня 2019

Я могу придумать 2 способа решения этой проблемы:

  • Простой способ
    • поместить файл CSV в папку dags вместе с файлом DAG.py.
    • Composer автоматически отображает свою структуру каталогов на GCS при создании, как указано при просмотре airflow.cfg
      • , этот файл можно найти, просмотрев один уровень в вашем DAGs folder в GCPConsole Composer
    • получить доступ к файлу, используя путь /home/airflow/gcs/dags/<path>/<to>/<file>.csv
  • Более сложный способ (используйте существующий оператор в качестве примера)
    • создать gcs_hook
    • run GoogleCloudStorageHook.download(bucket, object)
    • (необязательно) сохранить строку байтов как NamedTemporaryFile
    • прочитать этот файл или байтовую строку в панды
...