Flink Data Stream CSV Writer не записывает данные в файл CSV - PullRequest
0 голосов
/ 03 января 2019

Я новичок в Apache Flink и пытаюсь изучить потоки данных. Я читаю данные студента, которые имеют 3 столбца (имя, тема и метки) из файла CSV. Я применил фильтр к отметкам и выбрал только те записи, где отметки> 40. Я пытаюсь записать эти данные в файл CSV, но программа работает успешно, и файл CSV остается пустым. Данные не записываются в CSV-файл.

Я пытался с другим синтаксисом для записи файла CSV, но ни один из них не работал для меня. Я запускаю это локально через затмение. Запись в текстовый файл работает нормально.

DataStream<String> text = env.readFile(format, params.get("input"), 
FileProcessingMode.PROCESS_CONTINUOUSLY,100);
DataStream<String> filtered = text.filter(new FilterFunction<String>(){
public boolean filter(String value) {
    String[] tokens = value.split(",");
    return Integer.parseInt(tokens[2]) >= 40;
}
});
filtered.writeAsText("testFilter",WriteMode.OVERWRITE);
DataStream<Tuple2<String, Integer>> tokenized = filtered
.map(new MapFunction<String, Tuple2<String, Integer>>(){
public Tuple2<String, Integer> map(String value) throws Exception {
    return new Tuple2("Test", Integer.valueOf(1));
}
});
tokenized.print(); 
tokenized.writeAsCsv("file:///home/Test/Desktop/output.csv", 
WriteMode.OVERWRITE, "/n", ",");
try {
env.execute();
} catch (Exception e1) {
e1.printStackTrace();
}
}
}

Ниже мой входной формат CSV:

Name1,Subj1,30
Name1,Subj2,40
Name1,Subj3,40
Name1,Subj4,40

Tokenized.print () печатает все правильные записи.

Ответы [ 2 ]

0 голосов
/ 20 февраля 2019

Вы должны удалить tokenized.print(); до tokenized.writeAsCsv();.

Это будет использовать данные print();.

0 голосов
/ 03 января 2019

Я немного поэкспериментировал и обнаружил, что эта работа отлично работает:

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class WriteCSV {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);

        env.fromElements(new Tuple2<>("abc", 1), new Tuple2<>("def", 2))
                .writeAsCsv("file:///tmp/test.csv", FileSystem.WriteMode.OVERWRITE, "\n", ",");

        env.execute();
    }
}

Если я не установлю параллелизм на 1, тогда результаты будут другими.В этом случае test.csv - это каталог, содержащий четыре файла, каждый из которых написан одной из четырех параллельных подзадач.

Я не уверен, что не так в вашем случае, но, возможно, вы можете работать в обратном направлении из этого примера(при условии, что это работает для вас).

...