Spring cloud Кафка ограничивает потребление сообщений за единицу времени - PullRequest
0 голосов
/ 12 февраля 2020

У меня есть потребительское приложение с весенним облачным потоком. Это приложение потребляет сообщения из очереди (Kafka), и на каждое сообщение приложение выполняет 4 разных HTTP-вызова на 4 разных серверах, один из которых очень медленный (10 секунд ответа). Когда очередь полна сообщений, таких как 6000, приложение вылетает из-за нескольких причин (1 - нетти не хватает прямой памяти, 2 - мы используем реактор и пул потоков становится пустым).

Есть ли какой-нибудь способ ограничить скорость потребления на стороне потребителя либо через весенне-облачный поток, либо через кафку? Было бы неплохо, например, максимальное количество сообщений в секунду.

Здесь вы можете увидеть конфигурацию для kafka (application.yml)

spring:
  kafka:
    bootstrap-servers: my-cloud-kafka-instance
    admin:
      ssl:
        protocol: SSL
    properties:
      security.protocol: SSL
  cloud:
    stream:
      bindings:
        input:
          group: my-group
          destination: my-destination
          content-type: application/json

А вот мой потребитель (в kotlin):

@Controller
@EnableBinding(Processor::class)
class MyConsumer(
        myDependendies
) {

    @StreamListener(Processor.INPUT)
    fun myMethod(
            @Headers headers: Map<String, String>,
            @Payload myMessage: Message
    ) {
        myBussinessLogic
    }
}

1 Ответ

1 голос
/ 12 февраля 2020

Pollable Message Source позволяет потребителям контролировать нормы потребления. Например, чтобы проиллюстрировать кратко, мы сначала определяем интерфейс:

public interface PolledProcessor {

    @Input
    PollableMessageSource destIn();

    @Output
    MessageChannel destOut();

}

Используйте примеры:

@Autowired
private PolledProcessor polledProcessor;


@Scheduled(fixedDelay = 5_000)
public void poll() {
    polledProcessor.destIn().poll(message -> {
        byte[] bytes = (byte[]) message.getPayload();
        String payload = new String(bytes);
        logger.info("Received: " + payload);
        polledProcessor.destOut().send(MessageBuilder.withPayload(payload.toUpperCase())
                .copyHeaders(message.getHeaders())
                .build());
    });
}

Справочные ресурсы:

https://spring.io/blog/2018/02/27/spring-cloud-stream-2-0-polled-consumers

...