Я использую Apache Flink, пытаясь получить JSON записей из Кафки в InfluxDB, разбивая их из одной JSON записи на несколько точек InfluxDB в процессе.
Я нашел flatMap
преобразовать, и он чувствует, что он соответствует цели. Код ядра выглядит следующим образом:
DataStream<InfluxDBPoint> dataStream = stream.flatMap(new FlatMapFunction<JsonConsumerRecord, InfluxDBPoint>() {
@Override
public void flatMap(JsonConsumerRecord record, Collector<InfluxDBPoint> out) throws Exception {
Iterator<Entry<String, JsonNode>> iterator = //...
while (iterator.hasNext()) {
// extract point from input
InfluxDBPoint point = //...
out.collect(point);
}
}
});
По какой-то причине я получаю только одну из этих собранных точек, которые передаются в базу данных.
Даже когда я распечатываю все сопоставленные записи, кажется, что он работает просто отлично: dataStream.print()
выход:
org.apache.flink.streaming.connectors.influxdb.InfluxDBPoint@144fd091
org.apache.flink.streaming.connectors.influxdb.InfluxDBPoint@57256d1
org.apache.flink.streaming.connectors.influxdb.InfluxDBPoint@28c38504
org.apache.flink.streaming.connectors.influxdb.InfluxDBPoint@2d3a66b3
Не понимаю ли я flatMap
или может быть какая-то ошибка в Коннектор притока?