Каковы другие варианты обработки перекосов данных во Flink? - PullRequest
0 голосов
/ 10 апреля 2019

Я изучаю обработку асимметрии данных во Flink и как я могу изменить низкоуровневое управление физическим разделом для обеспечения равномерной обработки кортежей.Я создал искусственные асимметричные источники данных и собираюсь обработать (агрегировать) их через окно.Вот полный код .

streamTrainsStation01.union(streamTrainsStation02)
        .union(streamTicketsStation01).union(streamTicketsStation02)
        // map the keys
        .map(new StationPlatformMapper(metricMapper)).name(metricMapper)
        .rebalance() // or .rescale() .shuffle()
        .keyBy(new StationPlatformKeySelector())
        .window(TumblingProcessingTimeWindows.of(Time.seconds(20)))
        .apply(new StationPlatformRichWindowFunction(metricWindowFunction)).name(metricWindowFunction)
        .setParallelism(4)
        .map(new StationPlatformMapper(metricSkewedMapper)).name(metricSkewedMapper)
        .addSink(new MqttStationPlatformPublisher(ipAddressSink, topic)).name(metricSinkFunction)
        ;

. По данным панели управления Flink, я не вижу большой разницы между .shuffle(), .rescale() и .rebalance().Хотя в документации сказано, что преобразование rebalance () больше подходит для перекоса данных.

После этого я попытался использовать .partitionCustom(partitioner, "someKey").Однако, к моему удивлению, я не смог использовать setParallelism (4) для оконной операции.Документация гласит:

Примечание. Эта операция по своей сути не параллельна, поскольку все элементы должны проходить через один и тот же экземпляр оператора.

Я не понимаю, почему.Если мне разрешено делать partitionCustom, почему я не могу использовать параллелизм после этого?Вот полный код .

streamTrainsStation01.union(streamTrainsStation02)
        .union(streamTicketsStation01).union(streamTicketsStation02)
        // map the keys
        .map(new StationPlatformMapper(metricMapper)).name(metricMapper)
        .partitionCustom(new StationPlatformKeyCustomPartitioner(), new StationPlatformKeySelector())
        .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(20)))
        .apply(new StationPlatformRichAllWindowFunction(metricWindowFunction)).name(metricWindowFunction)
        .map(new StationPlatformMapper(metricSkewedMapper)).name(metricSkewedMapper)
        .addSink(new MqttStationPlatformPublisher(ipAddressSink, topic)).name(metricSinkFunction)
        ;

Спасибо, Фелипе

1 Ответ

0 голосов
/ 11 апреля 2019

Я получил ответ из списка рассылки FLink-user.В основном использование keyBy() после rebalance() убивает весь эффект, который пытается сделать rebalance().Первое (специальное) решение, которое я нашел, - это создание составного ключа, который заботится о перекосе ключа.

public class CompositeSkewedKeyStationPlatform implements Serializable {
    private static final long serialVersionUID = -5960601544505897824L;
    private Integer stationId;
    private Integer platformId;
    private Integer skewParameter;
}

Я использую его в функции map перед использованием keyBy().

public class StationPlatformSkewedKeyMapper
        extends RichMapFunction<MqttSensor, Tuple2<CompositeSkewedKeyStationPlatform, MqttSensor>> {
    private SkewParameterGenerator skewParameterGenerator;

    public StationPlatformSkewedKeyMapper() {
        this.skewParameterGenerator = new SkewParameterGenerator(10);
    }

    @Override
    public Tuple2<CompositeSkewedKeyStationPlatform, MqttSensor> map(MqttSensor value) throws Exception {
        Integer platformId = value.getKey().f2;
        Integer stationId = value.getKey().f4;
        Integer skewParameter = 0;

        if (stationId.equals(new Integer(2)) && platformId.equals(new Integer(3))) {
            skewParameter = this.skewParameterGenerator.getNextItem();
        }
        CompositeSkewedKeyStationPlatform compositeKey = new CompositeSkewedKeyStationPlatform(stationId, platformId,
                skewParameter);
        return Tuple2.of(compositeKey, value);
    }
}

вот мое полное решение .

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...