Источник HTTP-запроса в приемник Kafka? - PullRequest
0 голосов
/ 16 июня 2020

Как связать POST-маршрут воспроизведения с Kafka Sink?

Я нашел https://github.com/jamesward/hello-play-kafka. Но он использует тиковый источник случайных чисел для присоединения к приемнику Kafka. 1

Как сделать маршрут запроса POST источником для t ie в приемник Kafka?

Изменить: формат тела запроса POST - json с Content-Type приложения / json. Сообщение, отправленное кафке, должно быть таким же json. Маршрут ожидает одно сообщение json.

Ответы [ 2 ]

1 голос
/ 19 июня 2020

Поскольку ваши POST-запросы поступают непосредственно в Kafka, один к одному, неясно, подходит ли потоковая модель, предоставляемая Akka Streams. Возможно, было бы проще использовать API KafkaProducer напрямую, чтобы отправлять сообщения по одному на Kafka topi c. Это значительно упростит код и, в частности, обработку ошибок.

Все еще может быть веская причина предпочесть использование потоков Akka, например, для управления параллелизмом или упорядочением. Указанные c причины могут повлиять на лучший подход к нему, поэтому, если у вас есть более подробная информация по этому поводу, пожалуйста, добавьте к вопросу, и я обновлю ответ.

Например, если вы хотите чтобы гарантировать отсутствие одновременных операций записи, вы можете зарегистрировать класс как синглтон в Guice (при условии, что вы используете Guice для внедрения зависимостей). Когда он создан, он запускает Akka Stream с использованием Source.queue, Source.actorRef или Source.actorRefWithBackpressure вместе с Kafka Sink. Также потребуется использовать RestartSource, чтобы обеспечить перезапуск потока в случае ошибки. Предоставьте метод publi c, который позволяет вызывающим абонентам отправлять сообщения в материализованную исходную очередь или субъект, а затем внедрить синглтон в ваш класс контроллера, чтобы разрешить ему публиковать sh свои данные POST с помощью этого метода.

Большой недостаток этого подхода заключается в том, что действие контроллера может знать только то, что сообщение было успешно отправлено в поток, а не то, что оно было успешно записано полностью в Kafka. Если брокеры Kafka не работают, приложение дает сбой или запись не удается по какой-либо другой причине, сообщение может быть потеряно даже после возврата успешного результата инициирующему клиенту. Это не проблема для всех случаев использования, но ее необходимо учитывать.

1 голос
/ 17 июня 2020

Возможно, я неправильно понимаю ваш вопрос, но вы можете использовать REST Proxy для отправки сообщений в Kafka:

curl -X POST -H "Content-Type: application/vnd.kafka.json.v2+json" \
      --data '{ "records": [ { "value": { "name": "testUser0" } }, { "value": { "name": "testUser1" } } ] }' \
      "http://localhost:8082/topics/jsontest"

Ref: Confluent REST Proxy docs

...