Невозможно создать сообщение для Kafka topi c с использованием Transactional.Sink в Alpakka, но я вижу, что идемпотентный производитель включен - PullRequest
0 голосов
/ 21 марта 2020

Привет. Я пытался использовать API-интерфейс Producer, как показано в документации Alpakka. Я могу использовать запись с использованием Transactional source, и Producer создан, но не могу поместить сообщение в тему. Не могу произвести toi topi c с помощью Transactional.Sink в Alpakka, но я вижу, что идемпотентный производитель включен. Я вижу логи, что он поступает в логи c Но он не генерирует события для myTopi c

[info] oak c .p.KafkaProducer - [Producer clientId = provider-7fe8789 c - 3171-429e-afbf-d8a8ba12700 c, транзакционныйId = 7fe8789 c -3171-429e-afbf-d8a8ba12700c] идемпотентный производитель включен.

Не могли бы вы помочь мне понять, почему он может не выдавать сообщение to topi c

Я запускаю свой код локально, используя docker

Ниже мой код

``` Transactional.source(consumerSettings,
            Subscriptions.topics(topicNames))
            .mapMaterializedValue(innerControl = _)
            .map(consumerRecord => {

              handleBusiness(consumerRecord)
                .flatMap(res => Source.single(res)
                       .runWith(Transactional.sink(producerSettings, 
                                           UUID.randomUUID().toString)))

            })

        }
          source.runWith(Sink.ignore)

And my handleBusiness logics looks like below:

   ```
 private def handleBusiness(consumedMessage: ConsumerMessage.TransactionalMessage[String, String]): Future[Envelope[String, String, PartitionOffset]]  = {

       (conversion of consumedMessage ) map { message =>

            ProducerMessage.single(new ProducerRecord("myTopic", consumedMessage.record.key, message ), consumedMessage.partitionOffset)

     }


 }```


Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...