Python: Как импортировать список файлов в каталоге из HDFS - PullRequest
0 голосов
/ 22 января 2019

Я пытаюсь импортировать список файлов из HDFS в python.

Как это сделать из HDFS:

path =r'/my_path'
allFiles = glob.glob(path + "/*.csv")

df_list = []
for file_ in allFiles:
    df = pd.read_csv(file_,index_col=None, header=0,sep=';')    
    df_list.append(df)

Я думаю, подпроцесс.Попен добивается цели, но как извлечь только имя файла?

import subprocess
p = subprocess.Popen("hdfs dfs -ls /my_path/ ",
    shell=True,
    stdout=subprocess.PIPE,
    stderr=subprocess.STDOUT)


for line in p.stdout.readlines():
    print(line)

Вывод выглядит так:

b'Found 32 items\n'
b'-rw-------   3 user hdfs   42202621 2019-01-21 10:05 /my_path/file1.csv\n'
b'-rw-------   3 user hdfs   99320020 2019-01-21 10:05 /my_path/file2.csv\n'

1 Ответ

0 голосов
/ 24 января 2019

Заявитель : Это будет долго и утомительно. Но, учитывая обстоятельства, я постараюсь сделать его как можно более общим и воспроизводимым.


Учитывая требование отсутствия внешних библиотек (кроме pandas?), Выбор не должен быть. Я предлагаю максимально использовать WebHDFS.

AFAIK, установка HDFS , по умолчанию включает установку WebHDFS . Следующее решение в значительной степени опирается на WebHDFS .

Первый Шаг

Для начала вы должны знать WebHDFS URL. WebHDFS установлен на HDFS Namenode (s) , а порт по умолчанию - 50070 .

Поэтому мы начинаем с http://[namenode_ip]:50070/webhdfs/v1/, где /webhdfs/v1 / - это общий URL для всех.

Для примера, давайте примем это как http://192.168.10.1:50070/web/hdfs/v1.

Второй шаг

Обычно можно использовать curl для просмотра содержимого каталога HDFS. Подробное описание см. В API REST WebHDFS: список каталогов

Если вы должны были использовать curl, следующее обеспечивает FileStatuses всех файлов в данном каталоге.

curl "http://192.168.10.1:50070/webhdfs/v1/<PATH>?op=LISTSTATUS"
             ^^^^^^^^^^^^ ^^^^^             ^^^^  ^^^^^^^^^^^^^
             Namenode IP  Port              Path  Operation

Как уже упоминалось, это возвращает FileStatuses в объекте JSON:

{
  "FileStatuses":
  {
    "FileStatus":
    [
      {
        "accessTime"      : 1320171722771,
        "blockSize"       : 33554432,
        "group"           : "supergroup",
        "length"          : 24930,
        "modificationTime": 1320171722771,
        "owner"           : "webuser",
        "pathSuffix"      : "a.patch",
        "permission"      : "644",
        "replication"     : 1,
        "type"            : "FILE"
      },
      {
        "accessTime"      : 0,
        "blockSize"       : 0,
        "group"           : "supergroup",
        "length"          : 0,
        "modificationTime": 1320895981256,
        "owner"           : "szetszwo",
        "pathSuffix"      : "bar",
        "permission"      : "711",
        "replication"     : 0,
        "type"            : "DIRECTORY"
      },
      ...
    ]
  }
}

Тот же результат может быть достигнут при использовании библиотек Python по умолчанию:

import requests

my_path = '/my_path/'
curl = requests.get('http://192.168.10.1:50070/webhdfs/v1/%s?op=LISTSTATUS' % my_path)

И, как показано выше, фактический статус каждого файла на два уровня ниже результата JSON. Другими словами, чтобы получить FileStatus каждого файла:

curl.json()['FileStatuses']['FileStatus'] 

[
  {
    "accessTime"      : 1320171722771,
    "blockSize"       : 33554432,
    "group"           : "supergroup",
    "length"          : 24930,
    "modificationTime": 1320171722771,
    "owner"           : "webuser",
    "pathSuffix"      : "a.patch",
    "permission"      : "644",
    "replication"     : 1,
    "type"            : "FILE"
  },
  {
    "accessTime"      : 0,
    "blockSize"       : 0,
    "group"           : "supergroup",
    "length"          : 0,
    "modificationTime": 1320895981256,
    "owner"           : "szetszwo",
    "pathSuffix"      : "bar",
    "permission"      : "711",
    "replication"     : 0,
    "type"            : "DIRECTORY"
  },
  ...
]

Третий шаг

Поскольку теперь у вас есть вся необходимая информация, все, что вам нужно, - это анализ.

import os

file_paths = []
for file_status in curl.json()['FileStatuses']['FileStatus']:
    file_name = file_status['pathSuffix']
    # this is the file name in the queried directory
    if file_name.endswith('.csv'):
    # if statement is only required if the directory contains unwanted files (i.e. non-csvs).
        file_paths.append(os.path.join(path, file_name))
        # os.path.join asserts your result consists of absolute path

file_paths
['/my_path/file1.csv',
 '/my_path/file2.csv',
 ...]

Последний шаг

Теперь вы знаете пути к файлам и ссылкам WebHDFS, pandas.read_csv может справиться с остальными работами.

import pandas as pd

dfs = []
web_url = "http://192.168.10.1:50070/webhdfs/v1/%s?op=OPEN"
#                                                  ^^^^^^^
#                                    Operation is now OPEN
for file_path in file_paths:
    file_url = web_url % file_path
    # http://192.168.10.1:50070/webhdfs/v1/my_path/file1.csv?op=OPEN
    dfs.append(pd.read_csv(file_url))

И вот вам все импортированные .csv и присвоенные dfs.

Предупреждения

Если ваша HDFS настроена на HA (Высокая доступность), будет несколько namenodes и, следовательно, ваш namenode_ip должен быть настроен соответственно: Это должен быть IP-адрес активный узел.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...