Я изучаю обработку асимметрии данных во Flink и как я могу изменить низкоуровневое управление физическим разделом для обеспечения равномерной обработки кортежей.Я создал искусственные асимметричные источники данных и собираюсь обработать (агрегировать) их через окно.Вот полный код .
streamTrainsStation01.union(streamTrainsStation02)
.union(streamTicketsStation01).union(streamTicketsStation02)
// map the keys
.map(new StationPlatformMapper(metricMapper)).name(metricMapper)
.rebalance() // or .rescale() .shuffle()
.keyBy(new StationPlatformKeySelector())
.window(TumblingProcessingTimeWindows.of(Time.seconds(20)))
.apply(new StationPlatformRichWindowFunction(metricWindowFunction)).name(metricWindowFunction)
.setParallelism(4)
.map(new StationPlatformMapper(metricSkewedMapper)).name(metricSkewedMapper)
.addSink(new MqttStationPlatformPublisher(ipAddressSink, topic)).name(metricSinkFunction)
;
. По данным панели управления Flink, я не вижу большой разницы между .shuffle()
, .rescale()
и .rebalance()
.Хотя в документации сказано, что преобразование rebalance () больше подходит для перекоса данных.
После этого я попытался использовать .partitionCustom(partitioner, "someKey")
.Однако, к моему удивлению, я не смог использовать setParallelism (4) для оконной операции.Документация гласит:
Примечание. Эта операция по своей сути не параллельна, поскольку все элементы должны проходить через один и тот же экземпляр оператора.
Я не понимаю, почему.Если мне разрешено делать partitionCustom
, почему я не могу использовать параллелизм после этого?Вот полный код .
streamTrainsStation01.union(streamTrainsStation02)
.union(streamTicketsStation01).union(streamTicketsStation02)
// map the keys
.map(new StationPlatformMapper(metricMapper)).name(metricMapper)
.partitionCustom(new StationPlatformKeyCustomPartitioner(), new StationPlatformKeySelector())
.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(20)))
.apply(new StationPlatformRichAllWindowFunction(metricWindowFunction)).name(metricWindowFunction)
.map(new StationPlatformMapper(metricSkewedMapper)).name(metricSkewedMapper)
.addSink(new MqttStationPlatformPublisher(ipAddressSink, topic)).name(metricSinkFunction)
;
Спасибо, Фелипе