Flink kafka - Flink job не отправляет сообщения в разные разделы - PullRequest
0 голосов
/ 29 августа 2018

У меня есть следующая конфигурация:

  1. Одна тема кафки с 2 разделами
  2. Один экземпляр Zookeeper
  3. Один экземпляр кафки
  4. Два потребителя с одинаковым идентификатором группы

Фрагмент задания Flink:

speStream.addSink(new FlinkKafkaProducer011(kafkaTopicName,new 
SimpleStringSchema(), props));

Сценарий 1:

Я написал задание Flink (Producer) для eclipse, которое читает файл из папки и помещает msgs в тему kafka.

Поэтому, когда я запускаю этот код, используя eclipse, он работает нормально.

Например: Если я помещаю файл с 100 записями, flink отправляет несколько сообщений в раздел 1 и несколько сообщений в раздел 2, и, следовательно, оба потребителя получают несколько сообщений.

Сценарий 2: Когда я создаю jar-код вышеупомянутого кода и запускаю его на сервере flink, flink отправляет все сообщения в один раздел, и, следовательно, только один потребитель получает все сообщения.

Я хочу, чтобы сценарий 1 использовал банку, созданную в сценарии 2.

Ответы [ 2 ]

0 голосов
/ 30 августа 2018

Для производителей Flink-Kafka добавьте «ноль» в качестве последнего параметра.

speStream.addSink(new FlinkKafkaProducer011(
    kafkaTopicName,
    new SimpleStringSchema(),
    props,
    (FlinkKafkaPartitioner) null)
);

Краткое объяснение этого состоит в том, что это отключает Flink от использования разделителя по умолчанию FlinkFixedPartitioner. Это отключение по умолчанию позволит Kafka распределять данные между своими разделами так, как считает нужным. Если это НЕ отключено, то каждый слот параллелизма / задачи, используемый для приемника, использующего FlinkKafkaProducer, будет выполнять запись только в один раздел на слот параллелизма / задачи.

0 голосов
/ 30 августа 2018

Если вы не предоставите FlinkKafkaPartitioner или явно не скажете использовать Kafka, то будет использоваться FlinkFixedPartitioner, что означает, что все события из одной задачи окажутся в одном разделе.

Чтобы использовать разделитель Кафки, используйте этот ctor:

speStream.addSink(new FlinkKafkaProducer011(kafkaTopicName,new SimpleStringSchema(), props), Optional.empty());

Разница между запуском из IDE и Eclipse, вероятно, из-за разных настроек параллелизма или разбиения в Flink.

...