Flink карта потока CSV-файл в Tuple - PullRequest
0 голосов
/ 25 августа 2018

Я пытаюсь отобразить файл CSV, уже использованный Flink и созданный Kafka, в Tuple4. Мой CSV-файл имеет 4 столбца, и я хочу отобразить каждую строку в Tuple4. Проблема в том, что я не знаю, как реализовать функции map () и csv2Tuple.

Вот где я застрял:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

ParameterTool parameterTool = ParameterTool.fromArgs(ARGS);

DataStreamSource<String> myConsumer = env.addSource(new FlinkKafkaConsumer082<>(parameterTool.getRequired("topic"),
            new SimpleStringSchema(), parameterTool.getProperties()));

DataStream<Tuple4<Integer, Integer, Integer, Integer>> streamTuple = myConsumer.map(new csv2Tuple());
public static class csv2Tuple implements MapFunction<...> {public void map(){...}}

Я также хотел бы разобрать строку для целочисленных элементов в кортеже.

1 Ответ

0 голосов
/ 25 августа 2018

Предположим, что вы создаете каждую строку файла csv как сообщение Kafka и используете его, используя соединитель Flink Kafka. Вы должны просто разделить каждое использованное сообщение с помощью , (потому что это файл csv).

DataStream<Tuple4<Integer, Integer, Integer, Integer,>> streamTuple = myConsumer.map(new MapFunction<String, Tuple4<Integer, Integer, Integer, Integer>>() {
            @Override
            public Tuple4<Integer, Integer, Integer, Integer> map(String str) throws Exception {
                String[] temp = str.split(",");
                return new Tuple4<>(
                        Integer.parseInt(temp[0]),
                        Integer.parseInt(temp[1]),
                        Integer.parseInt(temp[2]),
                        Integer.parseInt(temp[3])
                );

            }
        });
...