Pyspark: чтение файлов свойств в HDFS с помощью configParser - PullRequest
0 голосов
/ 10 апреля 2019

Я использую ConfigParser для чтения значений ключей, которые передаются моей программе pyspark. Код работает нормально, когда я выполняю с краевого узла кластера hadoop, с файлом конфигурации в локальном каталоге краевого узла. Это не так, если файл конфигурации загружен в путь hdfs, и я пытаюсь получить доступ к нему с помощью парсера.

Файл конфигурации para.conf имеет содержимое ниже

[tracker]
port=9801

В режиме локального клиента, с para.conf в локальном каталоге, для доступа к значениям, которые я использую ниже.

from ConfigParser import SafeConfigParser
parser = SafeConfigParser()
parser.read("para.conf")
myport = parser.get('tracker', 'port')

Выше работает отлично ...

В кластере Hadoop: Загруженный файл para.conf в путь к каталогу hdfs bdc / para.conf

parser.read("hdfs://clusternamenode:8020/bdc/para.conf")

это ничего не возвращает, как и нижеследующее, избегая ..

parser.read("hdfs:///clusternamenode:8020//bdc//para.conf")

Хотя с помощью sqlCOntext я могу прочитать этот файл, который возвращает действительный rdd.

sc.textFile("hdfs://clusternamenode:8020/bdc/para.conf")

хотя я не уверен, что использование configParser может извлечь значения ключей из этого ..

Может кто-нибудь посоветовать, можно ли использовать configParser для чтения файлов из hdfs? Или есть альтернатива?

1 Ответ

0 голосов
/ 12 апреля 2019

Я скопировал большую часть кода, который вы указали в комментариях.Вы были действительно близки к решению.Ваша проблема заключалась в том, что sc.textFile создает строку в rdd для каждого символа новой строки.Когда вы вызываете .collect (), вы получаете список строк для каждой строки вашего документа.StringIO не ожидает список, он ожидает строку, и поэтому вы должны восстановить предыдущую структуру документа из вашего списка.См. Рабочий пример ниже:

import ConfigParser 
import StringIO 
credstr = sc.textFile("hdfs://clusternamenode:8020/bdc/cre.conf").collect() 
buf = StringIO.StringIO("\n".join(credstr)) 
parse_str = ConfigParser.ConfigParser() 
parse_str.readfp(buf) 
parse_str.get('tracker','port') 

Вывод:

'9801'
...