У меня есть потребительское приложение с весенним облачным потоком. Это приложение потребляет сообщения из очереди (Kafka), и на каждое сообщение приложение выполняет 4 разных HTTP-вызова на 4 разных серверах, один из которых очень медленный (10 секунд ответа). Когда очередь полна сообщений, таких как 6000, приложение вылетает из-за нескольких причин (1 - нетти не хватает прямой памяти, 2 - мы используем реактор и пул потоков становится пустым).
Есть ли какой-нибудь способ ограничить скорость потребления на стороне потребителя либо через весенне-облачный поток, либо через кафку? Было бы неплохо, например, максимальное количество сообщений в секунду.
Здесь вы можете увидеть конфигурацию для kafka (application.yml)
spring:
kafka:
bootstrap-servers: my-cloud-kafka-instance
admin:
ssl:
protocol: SSL
properties:
security.protocol: SSL
cloud:
stream:
bindings:
input:
group: my-group
destination: my-destination
content-type: application/json
А вот мой потребитель (в kotlin):
@Controller
@EnableBinding(Processor::class)
class MyConsumer(
myDependendies
) {
@StreamListener(Processor.INPUT)
fun myMethod(
@Headers headers: Map<String, String>,
@Payload myMessage: Message
) {
myBussinessLogic
}
}