как установить DASK на Google Composer - PullRequest
0 голосов
/ 01 ноября 2018

Я пытался установить Dask на Google Composer (Airflow). Я использовал pypi (GCP UI), чтобы добавить dask и ниже требуемые пакеты (хотя я не уверен, что все google нужны, не могу найти require.txt):

 dask
 toolz
 partd
 cloudpickle
 google-cloud
 google-cloud-storage
 google-auth
 google-auth-oauthlib
 decorator

когда я запускаю свою группу обеспечения доступности баз данных с dd.read_csv («ведро gcp»), в журнале воздушного потока отображается следующая ошибка:

    [2018-10-24 22:25:12,729] {base_task_runner.py:98} INFO - Subtask:   File "/usr/local/lib/python2.7/site-packages/dask/bytes/core.py", line 350, in get_fs_token_paths
    [2018-10-24 22:25:12,733] {base_task_runner.py:98} INFO - Subtask:     fs, fs_token = get_fs(protocol, options)
    [2018-10-24 22:25:12,735] {base_task_runner.py:98} INFO - Subtask:   File "/usr/local/lib/python2.7/site-packages/dask/bytes/core.py", line 473, in get_fs
    [2018-10-24 22:25:12,740] {base_task_runner.py:98} INFO - Subtask:     "Need to install `gcsfs` library for Google Cloud Storage support\n"
    [2018-10-24 22:25:12,741] {base_task_runner.py:98} INFO - Subtask:   File "/usr/local/lib/python2.7/site-packages/dask/utils.py", line 94, in import_required
    [2018-10-24 22:25:12,748] {base_task_runner.py:98} INFO - Subtask:     raise RuntimeError(error_msg)
    [2018-10-24 22:25:12,751] {base_task_runner.py:98} INFO - Subtask: RuntimeError: Need to install `gcsfs` library for Google Cloud Storage support
    [2018-10-24 22:25:12,756] {base_task_runner.py:98} INFO - Subtask:     conda install gcsfs -c conda-forge
    [2018-10-24 22:25:12,758] {base_task_runner.py:98} INFO - Subtask:     or
    [2018-10-24 22:25:12,762] {base_task_runner.py:98} INFO - Subtask:     pip install gcsfs

, поэтому я попытался установить gcsfs, используя pypi, но получил следующую ошибку воздушного потока:

{
  insertId:  "17ks763f726w1i"  
  logName:  "projects/xxxxxxxxx/logs/airflow-worker"  
  receiveTimestamp:  "2018-10-25T15:42:24.935880717Z"  
  resource: {…}  
  severity:  "ERROR"  
  textPayload:  "Traceback (most recent call last):
  File "/usr/local/bin/gcsfuse", line 7, in <module>
   from gcsfs.cli.gcsfuse import main
  File "/usr/local/lib/python2.7/site- 
    packages/gcsfs/cli/gcsfuse.py", line 3, in <module>
     fuse import FUSE
    ImportError: No module named fuse
 "  
  timestamp:  "2018-10-25T15:41:53Z"  
}

кажется, что он заперт в цикле необходимых пакетов !! не уверен, что я здесь что-то пропустил? есть мысли?

1 Ответ

0 голосов
/ 07 ноября 2018

Вам не нужно добавлять хранилище в ваши пакеты PyPi, оно уже установлено . Я запустил dag (image-version: composer-1.3.0-airflow-1.10.0), записывающий версию предустановленного пакета, и кажется, что это 1.13.0. Я также добавил в свой журнал следующее, чтобы повторить ваш случай:

import dask.dataframe as dd
def read_csv_dask():
    df = dd.read_csv('gs://gcs_path/data.csv')
    logging.info("csv from gs://gcs_path/ read alright")

Прежде всего, я добавил через пользовательский интерфейс следующие зависимости:

dask==0.20.0
toolz==0.9.0
partd==0.3.9
cloudpickle==0.6.1

Соответствующая задача завершилась с тем же сообщением, что и у вас («Нужно установить библиотеку gcsfs для поддержки Google Cloud Storage»), после чего я вернулся в интерфейс и попытался добавить gcsfs==0.1.2. Это никогда не удавалось. Тем не менее, я не получил ошибку, которую вы сделали, я вместо этого неоднократно терпел неудачу с «Composer Backend timed out».

На данный момент, вы можете рассмотреть следующие альтернативы:

1) Установите gcsfs с pip в BashOperator. Это не оптимально, так как вы будете устанавливать gcsfs каждый раз, когда запускается даг.

2) Использовать другую библиотеку. Что ты делаешь с этим CSV? Если вы загрузите его в каталог gs://composer_gcs_bucket/data/ (отметьте здесь ), вы можете прочитать его, например, с помощью. стандартная библиотека csv выглядит так:

import csv
def read_csv():
    f=open('/home/airflow/gcs/data/data.csv', 'rU')
    reader = csv.reader(f)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...