Я создал простое приложение Flink, которое читает журналы из темы Kafka, объединяет их в окне, продолжительность которого составляет 2 секунды, с водяным знаком 1 секунда. Затем я отправляю результаты в другую тему Кафки. Приложение развернуто в кластере.
Я потребляю кафку вот так:
Map<String, String> parameters = new HashMap<String, String>();
parameters.put("bootstrap.servers", bootstrapServers);
parameters.put("group.id", groupId);
parameters.put("zookeeper.connect", zookeeperConnect);
parameters.put("topic", inputTopic);
parameters.put("auto.offset.reset", "earliest");
ParameterTool parameterTool = ParameterTool.fromMap(parameters);
return env.addSource(new FlinkKafkaConsumer011<>(parameterTool.getRequired("topic"),
new SimpleStringSchema(),
parameterTool.getProperties()));
Агрегация производится с:
DataStream<AggregatedMeasures> aggregatedStream = messageStream
// Extract the timestamp from the object
.assignTimestampsAndWatermarks(new TimestampExtractor())
//Key for the Aggregation
.keyBy(new KeySelector<MeasureData, Tuple2<String, Timestamp>>() {
@Override
public Tuple2<String, Timestamp> getKey(MeasureData value) throws Exception {
return Tuple2.of(value.Id(), value.getEventTimestamp());
}
})
//Set the Time Window Duration
.timeWindow(Time.seconds(windowDuration))
//Aggregate
.aggregate(new AggregateMeasureFunction());
Я выдаю Кафке так:
producer = new FlinkKafkaProducer011<>(
bootstrapServers, // broker list
outputTopic, // target topic
new AggregatedMeasuresJSONSerializer()); // serialization schema
producer.setWriteTimestampToKafka(true);
messageStream.addSink(producer); // DataStream<AggregatedMeasures>
Я проверил это, создав во входной теме миллион журналов. Этот продюсер написан на Python медленно и Flink хорошо работает вживую.
Когда я пытаюсь прочитать ту же тему ввода, которая уже заполнена миллионами журналов, производитель Flink читает журналы, но не выводит все результаты, только часть из них.
Это противодавление? Я не понимаю это поведение.
Я использую Java 8, Flink 1.4 и YARN.