Итак, я получил MQTT -> MQTT и AMQP -> AMQP для работы;перевод MQTT -> AMQP, похоже, где-то не работает.Вот мой тест, он проходит, если мой «слушатель» также находится в MQTT с использованием paho, но эта реализация rabbitmq этого не делает.
@SpringBootTest
@SpringJUnitConfig
internal open class ProvisioningTest @Autowired constructor(
private val mqtt: IMqttAsyncClient,
private val mapper: ObjectMapper
) {
@Test
fun provision() {
val entity = Foley(
rfid = UUID.randomUUID().toString(),
)
val called = AtomicBoolean(false)
mqtt.subscribe("foley/created", 1) { _, _ -> called.set(true) }
mqtt.publish("foley/new", MqttMessage(mapper.writeValueAsBytes(entity)))
Awaitility.await().atMost(10, TimeUnit.SECONDS).untilTrue(called)
}
}
это слушатель, который публикует сохраненную сущность в другой очереди;он никогда не вызывается, когда я публикую в MQTT.
@Service
open class Provisioning(private val repo: FoleyRepo) {
private val log: Logger = LogManager.getLogger(this::class.java)
@SendTo("foley.created")
@RabbitListener(queuesToDeclare = [Queue("foley.new")] )
open fun listen(entity: Foley): Foley {
log.trace("saving: {}", entity)
val save = repo.save(entity)
log.debug("saved: {}", save)
return save
}
}
всю мою конфигурацию обмена сообщениями
@Configuration
open class MessagingConfig {
@Bean
open fun client(
@Value("tcp://\${mqtt.client.host:localhost}:\${mqtt.client.port:1883}") uri: String,
@Value("\${mqtt.client.user:#{null}}") user: String?,
@Value("\${mqtt.client.pass:#{null}}") pass: CharArray?
): IMqttAsyncClient {
val connOpt = MqttConnectOptions()
user?.let { connOpt.userName = it }
pass?.let { connOpt.password = it }
connOpt.isCleanSession = false
connOpt.isAutomaticReconnect = true
val client = MqttAsyncClient(uri, MqttAsyncClient.generateClientId(), MemoryPersistence())
client.connect(connOpt)
return client
}
@Bean
open fun messageConverter( om: ObjectMapper): MessageConverter {
return Jackson2JsonMessageConverter(om)
}
@Bean
open fun builder(): Jackson2ObjectMapperBuilderCustomizer {
return Jackson2ObjectMapperBuilderCustomizer {
it.modules(JavaTimeModule(), KotlinModule())
}
}
}
с использованием официального докера rabbitmq с включенным mqtt.
Что мне нужно исправить, чтобы сделать эту работу?