Я возился с RabbitMQ и столкнулся с определенной проблемой, касающейся публикации сообщений на канал от обратного вызова потребителя.Хотя я в состоянии нормально подтвердить сообщение, попытка опубликовать сообщение на канале ничего не дает.
Ниже приведен пример:
// Stuff here...
final Connection connection = connectionFactory.newConnection();
final Channel channel = connection.createChannel();
channel.basicQos(16);
channel.basicConsume("transcoding", false, "someConsumerTag", new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
executorService.submit(() -> {
TestClass testClass = new TestClass();
testClass.addObserver((o, arg) -> {
// And stuff here...
channel.basicAck(envelope.getDeliveryTag(), true);
channel.basicPublish("", "notification", null, "message");
});
testClass.run();
})
}
});
Как показано, я обрабатываю сообщенияв отдельном потоке, используя ExecutoreService
.У меня есть класс, который реализует Runnable
и расширяет Observable
.Как только TestClass
завершит свою работу, наблюдатель уведомляется, и я вручную подтверждаю сообщение.Это отлично работает.
Теперь я хочу опубликовать новое сообщение, однако RabbitMQ никогда не получает опубликованное сообщение.Я пытался создать новый канал и использовать его для публикации, но это тоже не работает.Затем я подумал, что это может быть проблема с многопоточностью, но публикация в отдельном потоке также не работает.
Я собирался попытаться создать отдельное соединение RabbitMQ только для публикации, но для меня это не имеет большого смыслатак как каналы должны быть однонаправленными.Я прочитал заметки RabbitMQ о параллелизме и не вижу ничего, что могло бы показаться.
Чего мне не хватает?