Как сделать FlatMap для базы данных в Apache Flink? - PullRequest
1 голос
/ 10 января 2020

Я использую Apache Flink, пытаясь получить JSON записей из Кафки в InfluxDB, разбивая их из одной JSON записи на несколько точек InfluxDB в процессе.

Я нашел flatMap преобразовать, и он чувствует, что он соответствует цели. Код ядра выглядит следующим образом:

DataStream<InfluxDBPoint> dataStream = stream.flatMap(new FlatMapFunction<JsonConsumerRecord, InfluxDBPoint>() {
    @Override
    public void flatMap(JsonConsumerRecord record, Collector<InfluxDBPoint> out) throws Exception {
        Iterator<Entry<String, JsonNode>> iterator = //...

        while (iterator.hasNext()) {
            // extract point from input
            InfluxDBPoint point = //...

            out.collect(point);
        }
    }
});

По какой-то причине я получаю только одну из этих собранных точек, которые передаются в базу данных.

Даже когда я распечатываю все сопоставленные записи, кажется, что он работает просто отлично: dataStream.print() выход:

org.apache.flink.streaming.connectors.influxdb.InfluxDBPoint@144fd091
org.apache.flink.streaming.connectors.influxdb.InfluxDBPoint@57256d1
org.apache.flink.streaming.connectors.influxdb.InfluxDBPoint@28c38504
org.apache.flink.streaming.connectors.influxdb.InfluxDBPoint@2d3a66b3

Не понимаю ли я flatMap или может быть какая-то ошибка в Коннектор притока?

1 Ответ

1 голос
/ 10 января 2020

Проблема на самом деле была связана с тем фактом, что ряд (определенный его набором тегов и измерением как видно здесь ) в Influx может иметь только одну точку за время , поэтому даже если мои поля отличались, конечная точка переписала все предыдущие точки с одинаковым значением времени.

...