Apache Flink: чтение файла из HDFS - PullRequest
0 голосов
/ 18 мая 2018

Поэтому мне нужно извлечь содержимое файла, хранящегося в HDFS, и выполнить определенный анализ для него.

Дело в том, что мне даже не удается прочитать файл и записать его содержимое в другой текстфайл в моей локальной файловой системе.(Я новичок во Flink, это всего лишь тест, чтобы убедиться, что я правильно читаю файл)

Файл в HDFS представляет собой простой текстовый файл.Вот мой код:

public class readFromHdfs {

    public static void main(String[] args) throws Exception {

        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        DataSet<String> lines = env.readTextFile("hdfs://localhost:9000//test/testfile0.txt");

        lines.writeAsText("/tmp/hdfs_file.txt"); 

        env.execute("File read from HDFS");
    }
}

Нет никаких выходных данных в / tmp после того, как я его запустил.

Это действительно простой код, и я не уверен, есть ли проблема с ним, или я 'Я просто делаю что-то не так.Как я уже сказал, я совсем новичок в Flink

Кроме того, задание отображается на веб-панели инструментов как неудачное.Вот контекст журнала Flink: https://pastebin.com/rvkXPGHU

Заранее спасибо

РЕДАКТИРОВАТЬ: я решил проблему, увеличив количество слотов задач.На веб-панели инструментов отображался доступный слот для задач, и он не жаловался на то, что у него вообще недостаточно слотов, поэтому я не думал, что это может быть так.,Я прочитал содержимое из testfile0.txt, но не записал их в hdfs_file.txt.Вместо этого он создает каталог с таким именем, и внутри него 8 текстовых файлов, 6 из которых абсолютно пустые.Два других содержат testfile0.txt (большая часть находится в 1.txt, а последний кусок в 2.txt).

Хотя это не имеет большого значения, потому что содержимое файла должным образом хранитсяв DataSet, чтобы я мог продолжить анализ данных.

1 Ответ

0 голосов
/ 18 мая 2018

Это работает, как и ожидалось - вы установили параллелизм полного задания (и, следовательно, выходного формата) на 8, поэтому каждый слот создает свой собственный файл (как вы могли знать, небезопасно одновременно записывать в один файл).Если вам нужен только 1 выходной файл, вы должны writeAsText(...).setParalellis(1) переопределить свойство глобального параллелизма.

Если вы хотите получать выходные данные в локальной файловой системе вместо HDFS, вы должны явно установить протокол "file: //" впуть, потому что для Hadoop flink по умолчанию выглядит как "hdfs: //".

...