Заявитель : Это будет долго и утомительно. Но, учитывая обстоятельства, я постараюсь сделать его как можно более общим и воспроизводимым.
Учитывая требование отсутствия внешних библиотек (кроме pandas
?), Выбор не должен быть. Я предлагаю максимально использовать WebHDFS
.
AFAIK, установка HDFS , по умолчанию включает установку WebHDFS . Следующее решение в значительной степени опирается на WebHDFS .
Первый Шаг
Для начала вы должны знать WebHDFS URL. WebHDFS установлен на HDFS Namenode (s) , а порт по умолчанию - 50070 .
Поэтому мы начинаем с http://[namenode_ip]:50070/webhdfs/v1/
, где /webhdfs/v1
/ - это общий URL для всех.
Для примера, давайте примем это как http://192.168.10.1:50070/web/hdfs/v1
.
Второй шаг
Обычно можно использовать curl
для просмотра содержимого каталога HDFS.
Подробное описание см. В API REST WebHDFS: список каталогов
Если вы должны были использовать curl
, следующее обеспечивает FileStatuses
всех файлов в данном каталоге.
curl "http://192.168.10.1:50070/webhdfs/v1/<PATH>?op=LISTSTATUS"
^^^^^^^^^^^^ ^^^^^ ^^^^ ^^^^^^^^^^^^^
Namenode IP Port Path Operation
Как уже упоминалось, это возвращает FileStatuses в объекте JSON:
{
"FileStatuses":
{
"FileStatus":
[
{
"accessTime" : 1320171722771,
"blockSize" : 33554432,
"group" : "supergroup",
"length" : 24930,
"modificationTime": 1320171722771,
"owner" : "webuser",
"pathSuffix" : "a.patch",
"permission" : "644",
"replication" : 1,
"type" : "FILE"
},
{
"accessTime" : 0,
"blockSize" : 0,
"group" : "supergroup",
"length" : 0,
"modificationTime": 1320895981256,
"owner" : "szetszwo",
"pathSuffix" : "bar",
"permission" : "711",
"replication" : 0,
"type" : "DIRECTORY"
},
...
]
}
}
Тот же результат может быть достигнут при использовании библиотек Python по умолчанию:
import requests
my_path = '/my_path/'
curl = requests.get('http://192.168.10.1:50070/webhdfs/v1/%s?op=LISTSTATUS' % my_path)
И, как показано выше, фактический статус каждого файла на два уровня ниже результата JSON. Другими словами, чтобы получить FileStatus каждого файла:
curl.json()['FileStatuses']['FileStatus']
[
{
"accessTime" : 1320171722771,
"blockSize" : 33554432,
"group" : "supergroup",
"length" : 24930,
"modificationTime": 1320171722771,
"owner" : "webuser",
"pathSuffix" : "a.patch",
"permission" : "644",
"replication" : 1,
"type" : "FILE"
},
{
"accessTime" : 0,
"blockSize" : 0,
"group" : "supergroup",
"length" : 0,
"modificationTime": 1320895981256,
"owner" : "szetszwo",
"pathSuffix" : "bar",
"permission" : "711",
"replication" : 0,
"type" : "DIRECTORY"
},
...
]
Третий шаг
Поскольку теперь у вас есть вся необходимая информация, все, что вам нужно, - это анализ.
import os
file_paths = []
for file_status in curl.json()['FileStatuses']['FileStatus']:
file_name = file_status['pathSuffix']
# this is the file name in the queried directory
if file_name.endswith('.csv'):
# if statement is only required if the directory contains unwanted files (i.e. non-csvs).
file_paths.append(os.path.join(path, file_name))
# os.path.join asserts your result consists of absolute path
file_paths
['/my_path/file1.csv',
'/my_path/file2.csv',
...]
Последний шаг
Теперь вы знаете пути к файлам и ссылкам WebHDFS, pandas.read_csv
может справиться с остальными работами.
import pandas as pd
dfs = []
web_url = "http://192.168.10.1:50070/webhdfs/v1/%s?op=OPEN"
# ^^^^^^^
# Operation is now OPEN
for file_path in file_paths:
file_url = web_url % file_path
# http://192.168.10.1:50070/webhdfs/v1/my_path/file1.csv?op=OPEN
dfs.append(pd.read_csv(file_url))
И вот вам все импортированные .csv
и присвоенные dfs
.
Предупреждения
Если ваша HDFS настроена на HA (Высокая доступность), будет несколько namenodes и, следовательно, ваш namenode_ip
должен быть настроен соответственно: Это должен быть IP-адрес активный узел.