Кастомная перегородка в пульсар - PullRequest
0 голосов
/ 28 февраля 2020

Невозможно отправить сообщение от производителя на пульсар, когда для производителя задано значение customPartition (см. Код ниже).

Producer<byte[]> producer = client.newProducer()
                .topic(pulsarTopic)
                //.messageRoutingMode(MessageRoutingMode.RoundRobinPartition)
                .messageRoutingMode(MessageRoutingMode.CustomPartition)
                .messageRouter( new MessageRounterImpl())
                .create();

Код для отправки сообщения:

 producer.send(msg);

MessageRouterImpl имеет случайным образом генерирует число в диапазоне от 0 до 5, как показано ниже: код

public class MessageRounterImpl  implements MessageRouter {
@Override
public int choosePartition(Message<?> msg, TopicMetadata metadata) {
   Random r = new Random();
   return r.nextInt((0 - 5) + 1);

}

}

У меня вопрос, почему я не могу отправить сообщение от производителя с CustomPartition и почему я получаю ниже сообщения журнала

Пакетная обработка сообщений из пакетного контейнера из потока таймера

Пакетная обработка сообщений из пакетного контейнера с 0 сообщениями

С помощью MessageRoutingMode.RoundRobinPartition и MessageRoutingMode.SinglePartition мне удалось отправить сообщение от производителя.

Было бы очень полезно, если бы кто-то пролил свет на это.

1 Ответ

0 голосов
/ 06 марта 2020

Во-первых, учтите, что разделенные темы должны быть явно созданы до того, как издатель начнет отправлять сообщения, например:

bin/pulsar-admin topics create-partitioned-topic persistent://tenant/namespace/partitioned-topic-name --partitions 5

Во-вторых, следующая строка вызовет исключение (она не может обрабатывать отрицательные значения в качестве входных данных):

return r.nextInt((0 - 5) + 1);

Можно использовать следующее:

return r.nextInt(5);
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...