Что может быть объяснением этой ошибки «pyarrow.lib.ArrowIOError: Файл HDFS не существует» при попытке чтения файлов в формате hdf с помощью Dask? - PullRequest
0 голосов
/ 30 апреля 2019

Я использую Dask Distributed и пытаюсь создать кадр данных из CSV, хранящегося в HDFS.Я полагаю, что соединение с HDFS успешно, так как я могу напечатать имена столбцов данных.Тем не менее, я получаю следующую ошибку, когда пытаюсь использовать функцию len или любую другую функцию на фрейме данных:

pyarrow.lib.ArrowIOError: HDFS file does not exist: /user/F43479/trip_data_v2.csv

Я не понимаю, почему у меня есть эта ошибка,Мне бы хотелось узнать ваше мнение.

Вот мой код:

# IMPORTS
import dask.dataframe as dd
from dask.distributed import Client
import pyarrow as pa
from pyarrow import csv
from dask import compute,config
import os
import subprocess

# GET HDFS CLASSPATH
classpath = subprocess.Popen(["/usr/hdp/current/hadoop-client/bin/hdfs", "classpath", "--glob"], stdout=subprocess.PIPE).communicate()[0]

# CONFIGURE ENVIRONMENT VARIABLES
os.environ["HADOOP_HOME"] = "/usr/hdp/current/hadoop-client"
os.environ["JAVA_HOME"] = "/home/G60070/installs/jdk1.8.0_201/"
os.environ["CLASSPATH"] = classpath.decode("utf-8")
os.environ["ARROW_LIBHDFS_DIR"] = "/usr/hdp/2.6.5.0-292/usr/lib/"

# LAUNCH DASK DISTRIBUTED
client = Client('10.22.104.37:8786')

# SET HDFS CONNEXION
config.set(hdfs_driver='pyarrow', host='xxxxx.xxx.xx.fr', port=8020)

# READ FILE ON HDFS
folder = 'hdfs://xxxxx.xxx.xx.fr:8020/user/F43479/'
filepath = folder+'trip_data_v2.csv'
df = dd.read_csv(filepath)

# TREATMENTS ON FILE
print(df.columns)# this works
print(len(df))# produces an error

Вот содержимое моего репозитория HDFS:

[F43479@xxxxx dask_tests]$ hdfs dfs -ls /user/F43479/
Found 9 items
-rw-r-----   3 F43479 hdfs            0 2019-03-07 16:42 /user/F43479/-
drwx------   - F43479 hdfs            0 2019-04-03 02:00 /user/F43479/.Trash
drwxr-x---   - F43479 hdfs            0 2019-03-13 16:53 /user/F43479/.hiveJars
drwxr-x---   - F43479 hdfs            0 2019-03-13 16:52 /user/F43479/hive
drwxr-x---   - F43479 hdfs            0 2019-03-15 13:23 /user/F43479/nyctaxi_trip_data
-rw-r-----   3 F43479 hdfs           36 2019-04-15 11:13 /user/F43479/test.csv
-rw-r-----   3 F43479 hdfs  50486731416 2019-03-26 17:37 /user/F43479/trip_data.csv
-rw-r-----   3 F43479 hdfs   5097056230 2019-04-15 13:57 /user/F43479/trip_data_v2.csv
-rw-r-----   3 F43479 hdfs 504867312828 2019-04-02 11:15 /user/F43479/trip_data_x10.csv

И, наконец,полный результат выполнения кода:

Index(['vendor_id', 'passenger_count', 'trip_time_in_secs', 'trip_distance'], dtype='object')
Traceback (most recent call last):
  File "dask_pa_hdfs.py", line 32, in <module>
    print(len(df))
  File "/opt/anaconda3/envs/python3-dask/lib/python3.7/site-packages/dask/dataframe/core.py", line 438, in __len__
    split_every=False).compute()
  File "/opt/anaconda3/envs/python3-dask/lib/python3.7/site-packages/dask/base.py", line 156, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File "/opt/anaconda3/envs/python3-dask/lib/python3.7/site-packages/dask/base.py", line 397, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/opt/anaconda3/envs/python3-dask/lib/python3.7/site-packages/distributed/client.py", line 2321, in get
    direct=direct)
  File "/opt/anaconda3/envs/python3-dask/lib/python3.7/site-packages/distributed/client.py", line 1655, in gather
    asynchronous=asynchronous)
  File "/opt/anaconda3/envs/python3-dask/lib/python3.7/site-packages/distributed/client.py", line 673, in sync
    return sync(self.loop, func, *args, **kwargs)
  File "/opt/anaconda3/envs/python3-dask/lib/python3.7/site-packages/distributed/utils.py", line 277, in sync
    six.reraise(*error[0])
  File "/opt/anaconda3/envs/python3-dask/lib/python3.7/site-packages/six.py", line 693, in reraise
    raise value
  File "/opt/anaconda3/envs/python3-dask/lib/python3.7/site-packages/distributed/utils.py", line 262, in f
    result[0] = yield future
  File "/opt/anaconda3/envs/python3-dask/lib/python3.7/site-packages/tornado/gen.py", line 1133, in run
    value = future.result()
  File "/opt/anaconda3/envs/python3-dask/lib/python3.7/site-packages/tornado/gen.py", line 1141, in run
    yielded = self.gen.throw(*exc_info)
  File "/opt/anaconda3/envs/python3-dask/lib/python3.7/site-packages/distributed/client.py", line 1500, in _gather
    traceback)
  File "/opt/anaconda3/envs/python3-dask/lib/python3.7/site-packages/six.py", line 692, in reraise
    raise value.with_traceback(tb)
  File "/opt/anaconda3/envs/python3-dask/lib/python3.7/site-packages/dask/bytes/core.py", line 133, in read_block_from_file
    with copy.copy(lazy_file) as f:
  File "/opt/anaconda3/envs/python3-dask/lib/python3.7/site-packages/dask/bytes/core.py", line 177, in __enter__
    f = SeekableFile(self.fs.open(self.path, mode=mode))
  File "/opt/anaconda3/envs/python3-dask/lib/python3.7/site-packages/dask/bytes/pyarrow.py", line 37, in open
    return self.fs.open(path, mode=mode, **kwargs)
  File "pyarrow/io-hdfs.pxi", line 431, in pyarrow.lib.HadoopFileSystem.open
  File "pyarrow/error.pxi", line 83, in pyarrow.lib.check_status
pyarrow.lib.ArrowIOError: HDFS file does not exist: /user/F43479/trip_data_v2.csv

Ответы [ 2 ]

0 голосов
/ 13 мая 2019

Я решил проблему.Это было связано с разрешениями на доступ к HDFS.Я работаю над кластером Kerberised HDFS и запустил процесс Dask Scheduler на пограничном узле и рабочих процессах на узлах данных .Для доступа к HDFS pyarrow необходимы 2 вещи:

  • Он должен быть установлен в планировщике, и все рабочие
  • Переменные среды также должны быть настроены на всех узлах

Затем для доступа к HDFS запущенные процессы должны пройти проверку подлинности через Kerberos.При запуске кода из процесса планировщика я могу подключиться к HDFS, потому что мой сеанс проходит проверку подлинности через Kerberos.Вот почему я могу получить информацию о столбцах файла CSV.Однако, поскольку рабочие процессы не были аутентифицированы, они не могли получить доступ к HDFS, что вызвало ошибку.Чтобы решить эту проблему, нам пришлось остановить рабочие процессы, изменить скрипт, используемый для их запуска, чтобы он включал команду kerberos для аутентификации в HDFS (kinit что-то), а затем перезапустить рабочие процессы.Пока это работает, но это означает, что Dask не совместим с кластером Kerberised.Используя нашу конфигурацию, все пользователи имеют одинаковые разрешения для HDFS при запуске вычисления с рабочего.Я думаю, что это не совсем безопасная практика

0 голосов
/ 30 апреля 2019

Вы тщательно настроили среду в своем локальном процессе, содержащем клиента, чтобы он мог взаимодействовать с HDFS. Для обнаружения столбцов этого достаточно, поскольку Dask делает это заранее, начиная с клиентского процесса и первых нескольких строк данных. Тем не менее:

client = Client('10.22.104.37:8786')

ваш планировщик и работники живут в другом месте, и у них нет переменных среды, которые вы им предоставили. Когда вы запускаете ваши задачи, рабочие не знают, как найти файл.

Что вам нужно сделать, так это настроить окружающую среду и на рабочих. Это можно сделать до того, как они будут запущены, или когда-то уже:

def setenv():
    import os
    os.environ["HADOOP_HOME"] = "/usr/hdp/current/hadoop-client"
    os.environ["JAVA_HOME"] = "/home/G60070/installs/jdk1.8.0_201/"
    os.environ["CLASSPATH"] = classpath.decode("utf-8")
    os.environ["ARROW_LIBHDFS_DIR"] = "/usr/hdp/2.6.5.0-292/usr/lib/"

client.run(setenv)

(должен возвращаться с набором None от каждого работника)

Обратите внимание, что, если новые работники подключаются динамически, каждому из них необходимо будет запустить эту функцию перед доступом к HDFS.

...