Поскольку ваши POST-запросы поступают непосредственно в Kafka, один к одному, неясно, подходит ли потоковая модель, предоставляемая Akka Streams. Возможно, было бы проще использовать API KafkaProducer
напрямую, чтобы отправлять сообщения по одному на Kafka topi c. Это значительно упростит код и, в частности, обработку ошибок.
Все еще может быть веская причина предпочесть использование потоков Akka, например, для управления параллелизмом или упорядочением. Указанные c причины могут повлиять на лучший подход к нему, поэтому, если у вас есть более подробная информация по этому поводу, пожалуйста, добавьте к вопросу, и я обновлю ответ.
Например, если вы хотите чтобы гарантировать отсутствие одновременных операций записи, вы можете зарегистрировать класс как синглтон в Guice (при условии, что вы используете Guice для внедрения зависимостей). Когда он создан, он запускает Akka Stream с использованием Source.queue
, Source.actorRef
или Source.actorRefWithBackpressure
вместе с Kafka Sink
. Также потребуется использовать RestartSource
, чтобы обеспечить перезапуск потока в случае ошибки. Предоставьте метод publi c, который позволяет вызывающим абонентам отправлять сообщения в материализованную исходную очередь или субъект, а затем внедрить синглтон в ваш класс контроллера, чтобы разрешить ему публиковать sh свои данные POST с помощью этого метода.
Большой недостаток этого подхода заключается в том, что действие контроллера может знать только то, что сообщение было успешно отправлено в поток, а не то, что оно было успешно записано полностью в Kafka. Если брокеры Kafka не работают, приложение дает сбой или запись не удается по какой-либо другой причине, сообщение может быть потеряно даже после возврата успешного результата инициирующему клиенту. Это не проблема для всех случаев использования, но ее необходимо учитывать.