Проблемы с агрегацией данных из Кафки в Flink - PullRequest
0 голосов
/ 28 июня 2018

Я создал простое приложение Flink, которое читает журналы из темы Kafka, объединяет их в окне, продолжительность которого составляет 2 секунды, с водяным знаком 1 секунда. Затем я отправляю результаты в другую тему Кафки. Приложение развернуто в кластере.

Я потребляю кафку вот так:

Map<String, String> parameters = new HashMap<String, String>();
    parameters.put("bootstrap.servers", bootstrapServers);
    parameters.put("group.id", groupId);
    parameters.put("zookeeper.connect", zookeeperConnect);
    parameters.put("topic", inputTopic);
    parameters.put("auto.offset.reset", "earliest");

ParameterTool parameterTool = ParameterTool.fromMap(parameters);

return env.addSource(new FlinkKafkaConsumer011<>(parameterTool.getRequired("topic"),
    new SimpleStringSchema(),
    parameterTool.getProperties()));

Агрегация производится с:

DataStream<AggregatedMeasures> aggregatedStream = messageStream

    // Extract the timestamp from the object
    .assignTimestampsAndWatermarks(new TimestampExtractor())

    //Key for the Aggregation
    .keyBy(new KeySelector<MeasureData, Tuple2<String, Timestamp>>() {
      @Override
      public Tuple2<String, Timestamp> getKey(MeasureData value) throws Exception {
        return Tuple2.of(value.Id(), value.getEventTimestamp());
      }
    })

    //Set the Time Window Duration
    .timeWindow(Time.seconds(windowDuration))

    //Aggregate
    .aggregate(new AggregateMeasureFunction());

Я выдаю Кафке так:

producer = new FlinkKafkaProducer011<>(
    bootstrapServers, // broker list
    outputTopic, // target topic
    new AggregatedMeasuresJSONSerializer()); // serialization schema

producer.setWriteTimestampToKafka(true);

messageStream.addSink(producer); // DataStream<AggregatedMeasures>

Я проверил это, создав во входной теме миллион журналов. Этот продюсер написан на Python медленно и Flink хорошо работает вживую.

Когда я пытаюсь прочитать ту же тему ввода, которая уже заполнена миллионами журналов, производитель Flink читает журналы, но не выводит все результаты, только часть из них.

Это противодавление? Я не понимаю это поведение.

Я использую Java 8, Flink 1.4 и YARN.

...