Как реализовать потребителя Kafka для обработки событий по требованию с использованием Spring-Cloud-Stream? - PullRequest
1 голос
/ 29 марта 2019

Я пытаюсь реализовать PollableConsumer, который начинает опрашивать сообщения от Kafka при определенных условиях, в этом случае, когда я достигаю конечной точки в моем приложении SpringBoot.

Я пробовал несколько способов запуска опроса при определенных условиях, но, видимо, он работает, только если он постоянно опрашивает тему кафки. (как и во всех примерах в документах Spring-Cloud-Stream)

Я ищу что-то вроде этого:

public interface CustomProcessor {
    @Input
    PollableMessageSource input();
}
 public void run() {
        boolean result = true;
        while (result) {
            result = input.poll(m -> {
                Event event = (Event) m.getPayload();
                GenericMessage<Event> genericMessage = new GenericMessage<>(event, m.getHeaders());
                eventMessageConsumer.consume(genericMessage);
            }, new ParameterizedTypeReference<Event>() {
            });

            try {
                Thread.sleep(1_000);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                break;
            }
            if (result) {
                System.out.println("Success");
            }
        }
    }

Это может сработать, когда я достигну такой конечной точки:

@GetMapping("/process")
public void process() {
   SomeClass.run();
}

Ответы [ 2 ]

0 голосов
/ 02 апреля 2019

Очевидно, что на данный момент невозможно приостановить PollableConsumer с помощью Spring-Cloud-Stream, поэтому я вернулся к основанному на событиях потреблению сообщений и использовал привод для управления состоянием привязки.Вслед за этим потоком и весенним облачным потоком docs я ввел BindingsEndpoint и изменил состояние привязки следующим образом:

@RestController
public class EventController {
    @Autowired
    public EventController(BindingsEndpoint bindingsEndpoint) {
        this.bindingsEndpoint = bindingsEndpoint;
    }
    @GetMapping("/changeState")
    public void sendMessage(@RequestParam("state") String state) {
        if (state.equals("paused")) {
            bindingsEndpoint.changeState("MY_BINDING", 
                           BindingsEndpoint.State.PAUSED);
        }
        if (state.equals("resumed")) {
            bindingsEndpoint.changeState("MY_BINDING", 
                           BindingsEndpoint.State.RESUMED);
        }
    }

Это неименно то, чего я хотел достичь, но это достаточно близко.

0 голосов
/ 29 марта 2019

В вашем примере кода я не уверен, что возвращает input.poll, поскольку вы присваиваете результат boolean, а это не то, что возвращает метод KafkaConsumer.poll.

A КафкаПотребитель может быть приостановлен и перезапущен по требованию, это не должно быть проблемой.

С последней KafkaConsumer javadoc:

Управление потоком потребления

...

Kafka поддерживает динамическое управление потоками потребления, используя pause (Collection) и resume (Collection) для приостановки потребления науказанных назначенных разделов и возобновить потребление на указанных приостановленных разделах соответственно в будущих вызовах опросов (Duration).

А затем из описания метода pause:

Обратите внимание, что этот метод не влияет на подписку раздела.В частности, не вызывает группового перебалансирования при использовании автоматического назначения.

https://kafka.apache.org/22/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html

Так что в вашем случае вы могли бы просто иметьФлаг volatile boolean isPaused, и если он был изменен, вы будете вводить команды pause или resume по мере необходимости.KafkaConsumer сделает все остальное.

Также обратите внимание, что KafkaConsumer не поточно-ориентирован, поэтому вы должны выполнять эти команды в том же потоке, который опрашивает данные.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...