У меня горизонтально масштабируемое приложение.Мое приложение представляет собой сервис весенней загрузки с версией Spring Framework 2.1.1.RELEASE.
У каждого работника есть очередь (и соответствующий слушатель), и он связывается с обменом типа "x-compatibility-hash"».Я использую плагин rabbitmq-constant-hash-exchange версии 3.7.7 Rabbitmq.
Сообщения корректно хэшируются согласованно в соответствии с их message_id (, как описано здесь )и всегда используются одними и теми же рабочими очередями моего приложения, как показано выше:
Мой код для достижения этой цели следующий:
public static final String EXCHANGE = "e3";
private static final String EXCHANGE_TYPE = "x-consistent-hash";
@Bean
public Queue autoDeleteQueue1() {
return new AnonymousQueue();
}
@Qualifier("testlistenerAdapter")
@Bean
MessageListenerAdapter testlistenerAdapter(TestListener receiver) {
MessageListenerAdapter msgadapter = new MessageListenerAdapter(receiver, "testMessageReceived");
return msgadapter;
}
@Qualifier("testcontainer")
@Bean
SimpleMessageListenerContainer testcontainer(ConnectionFactory connectionFactory,
@Qualifier("testlistenerAdapter") MessageListenerAdapter listenerAdapter) throws IOException {
Connection conn = connectionFactory.createConnection();
Channel ch = conn.createChannel(true);
ch.queueDeclare(autoDeleteQueue1().getName(), true, true, true, null);
ch.queuePurge(autoDeleteQueue1().getName());
Map<String, Object> args = new HashMap<>();
args.put("hash-property", "message_id");
ch.exchangeDeclare(EXCHANGE, EXCHANGE_TYPE, false, false, args);
ch.queueBind(autoDeleteQueue1().getName(), EXCHANGE, "20");
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(autoDeleteQueue1().getName());
container.setMessageListener(listenerAdapter);
return container;
}
То, что я хочу сделать, это направить все сообщения с тем же message_id на том же работнике, что и раньше, но на другом слушателе очереди.В первом случае все было просто, потому что у одного работника была только одна очередь.Теперь, когда один работник имеет более одной очереди, я хочу, чтобы сообщение последовательно хэшировалось на одном и том же работнике.
Я попытался сделать это, добавив разные заголовки / routing_key для каждого сообщения, но безуспешно.
Например, любое сообщение с message_id: «test» всегда маршрутизируется на одном и том же работнике (скажем, «worker B») и в зависимости от того, какой заголовок / ключ_рассылки оно имеет (foo или bar), будет использованослушатели очереди foo или bar (см. изображение ниже).Бизнес-логика моего приложения отслеживает состояние, поэтому мне нужно, чтобы сообщения с одинаковым message_id обслуживались одним и тем же работником.
Я перепробовал много вариантов реализации, но, кажется, это не тривиальная вещь.Я боюсь, что то, что я хочу сделать, не выполнимо, потому что плагин rabbitmq выполняет хеширование уровня очереди.Быстрое решение было бы застрять в первом случае.Оставьте только один тип слушателей, а затем выполните разделение бизнес-логики в одной и той же конечной точке.Любые идеи о том, как я мог бы реализовать такую функциональность, не объединяя функциональность двух слушателей (foo и bar) в одном?