Apache Flink writeAsCsv не имеет содержимого. - PullRequest
0 голосов
/ 12 декабря 2018

Я пытаюсь сохранить свой поток данных в формате CSV в виде кортежа из двух значений Long (метка времени появления и метка времени системы).Я создаю DataStream значений Tuple следующим образом.

DataStream<Tuple2<Long, Long>> csvStream  = messageStream.map(new MapFunction<GraphStreamItem, Tuple2<Long, Long>>() {
        @Override
        public Tuple2<Long, Long> map(GraphStreamItem graphStreamItem) throws Exception {
            return Tuple2.of(graphStreamItem.getAppTimestamp(), graphStreamItem.getSysTimestamp());
        }
    });

Теперь я пытаюсь сохранить его как csv следующим образом:

csvStream.writeAsCsv("path/to/save.csv", FileSystem.WriteMode.OVERWRITE).setParallelism(1);

Но файл csv кажется пустым всегда.Я попытался распечатать csvStream, и два значения кортежа видны, когда я это делаю.Любая помощь будет принята с благодарностью.

Заранее спасибо :).

...