Привет. Я пытался использовать API-интерфейс Producer, как показано в документации Alpakka. Я могу использовать запись с использованием Transactional source, и Producer создан, но не могу поместить сообщение в тему. Не могу произвести toi topi c с помощью Transactional.Sink в Alpakka, но я вижу, что идемпотентный производитель включен. Я вижу логи, что он поступает в логи c Но он не генерирует события для myTopi c
[info] oak c .p.KafkaProducer - [Producer clientId = provider-7fe8789 c - 3171-429e-afbf-d8a8ba12700 c, транзакционныйId = 7fe8789 c -3171-429e-afbf-d8a8ba12700c] идемпотентный производитель включен.
Не могли бы вы помочь мне понять, почему он может не выдавать сообщение to topi c
Я запускаю свой код локально, используя docker
Ниже мой код
``` Transactional.source(consumerSettings,
Subscriptions.topics(topicNames))
.mapMaterializedValue(innerControl = _)
.map(consumerRecord => {
handleBusiness(consumerRecord)
.flatMap(res => Source.single(res)
.runWith(Transactional.sink(producerSettings,
UUID.randomUUID().toString)))
})
}
source.runWith(Sink.ignore)
And my handleBusiness logics looks like below:
```
private def handleBusiness(consumedMessage: ConsumerMessage.TransactionalMessage[String, String]): Future[Envelope[String, String, PartitionOffset]] = {
(conversion of consumedMessage ) map { message =>
ProducerMessage.single(new ProducerRecord("myTopic", consumedMessage.record.key, message ), consumedMessage.partitionOffset)
}
}```