Python-код для балок с поддержкой hdfs в конвейере - PullRequest
0 голосов
/ 16 января 2019

Я запускаю здесь пример настроения для преобразования тензорного потока.https://github.com/tensorflow/transform/blob/master/examples/sentiment_example.py

Для функции fn ReadAndShuffleData (), определенной в строке 78-98, возможно ли, что аналогичным образом я могу загружать файлы, но из HDFS, а не из GCS?

Я пробовалцелый день с несколькими API лучей (beams-2.8.0), но не удалось, и я думаю, что наиболее перспективным является использование beams.io.hadoopfilesystem.Но этот fn фактически создает файл-объект python и не может быть прочитан с использованием beams.io.ReadFromText () в конвейере луча.

Я также правильно передал HadoopFileSystemPipelineOptions.Кто-нибудь может показать мне направление для решения проблемы или 2/3-строчные фрагменты кода или обходной путь?Большое спасибо!

ps hadoop 2.7.7, лучи 2.8 и данные загружены правильно.

Я думаю, что здесь может не хватить теоретического понимания, любые ссылки будут оценены!

1 Ответ

0 голосов
/ 28 мая 2019

Вы можете использовать преобразование apache_beam.Create:

Начальная подпись: beam.Create (self, values, reshuffle = True)

Docstring: преобразование, которое создает PCollection из итерируемого.

import apache_beam as beam
from apache_beam.options.pipeline_options import HadoopFileSystemOptions
from apache_beam.io.hadoopfilesystem import HadoopFileSystem

HDFS_HOSTNAME = 'foo.hadoop.com'
HDFS_PORT = 50070
hdfs_client_options = HadoopFileSystemOptions(hdfs_host=HDFS_HOSTNAME, hdfs_port=HDFS_PORT, hdfs_user="foobar")
hdfs_client = HadoopFileSystem(hdfs_client_options)

input_file_hdfs = "hdfs://foo/bar.csv"
f = hdfs_client.open(input_file_hdfs)

p = beam.Pipeline(options=PipelineOptions())
lines = p | 'ReadMyFile' >> beam.Create(f)
res = lines | "WriteMyFile" >> beam.io.WriteToText("./bar", ".csv")
p.run()
...