Как искать смещение для отметки времени с пружиной для Кафки - PullRequest
0 голосов
/ 07 декабря 2018

Мы используем spring-kafka для приема сообщений, которые должны быть перенаправлены во внешний интерфейс как события, отправленные сервером (SSE).

Когда пользователь входит в систему, он должен увидеть все пропущенные события.с момента ее последнего сеанса.

Текущая реализация использует ConsumerSeekCallback, как описано в этот ответ

Однако этот обратный вызов не поддерживает метод offsetForTimes базового KafkaConsumer ( KafkaConsumer # offsetForTimes ).

Поэтому мне нужно использовать seekToBeginning и фильтр для отметки времени, что приведет к проблемам при большом количестве сообщений ...

IsЕсть ли другой способ получать только сообщения с заданной отметкой времени?Может быть, безопасный способ непосредственного использования потребителем?

Ответы [ 2 ]

0 голосов
/ 13 декабря 2018

Как отметил Гари Рассел выше, ConsumerSeekCallback является устаревшим, поэтому он не разрешен ... и я не буду открывать вопрос GitHub ...

Я наконец смог достичь своей цели:

Когда пользователь входит в систему, он должен видеть все события, которые он пропустил со времени своего последнего сеанса.

путем обработки всех новых подписок в EventListener дляListenerContainerIdleEvent, где потребитель доступен как часть данных события:

    @EventListener(condition = "event.listenerId.startsWith('qux-')")
    public void idleEventHandler(ListenerContainerIdleEvent event) {

        // find new subscriptions
        Collection<EventListenerSubscription> newSubscriptions = 
                subscriptions.stream().filter(s -> s.isNew())
                .collect(Collectors.toList());

        if (!newSubscriptions.isEmpty()) {

            // mark subscriptions a not new
            newSubscriptions.forEach(s -> s.setNew(false));

            // compute the oldest time stamp
            OptionalLong oldestTimeStamp = 
                    newSubscriptions.stream()
                    .mapToLong(s -> s.getLastTimeStamp())
                    .reduce(Long::min);

            if (oldestTimeStamp.isPresent()) {

                // seek on topic for oldest time stamp
                Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();
                timestampsToSearch.put(new TopicPartition(eventTopic, 0),
                                       oldestTimeStamp.getAsLong());
                Consumer<?, ?> consumer = event.getConsumer();
                event.getConsumer().offsetsForTimes(timestampsToSearch).forEach((k, v) -> {
                    consumer.seek(k, v.offset());
                });
            }
        }
    }

Я определяю самую старую отметку времени во всех новых подписках, помечаю эти подписки как не новые и использую поиск потребителя нараздел для самой старой отметки времени.

Чтобы получить событие простоя контейнера, интервал простоя должен быть настроен в свойствах контейнера, как описано здесь .

Затем KafkaListener позаботится об отправке старых событий подписчикам (ранее новым):

    @KafkaListener(id = "qux", topics = { "${app.event.topic}" }, errorHandler = "kafkaListenerErrorHandler")
    public void receive(@Payload Event event, @Headers MessageHeaders headers) throws JsonProcessingException {

        // collect the subscribers not marked as new
        Collection<EventListenerSubscription> oldSubscriptions = 
                subscriptions.stream().filter(s -> !s.isNew())
                .collect(Collectors.toList());

        for (EventListenerSubscription s : oldSubscriptions) {
            if (s.getLastTimeStamp() < timestamp) {
                s.addMessage(event, timestamp);
            }
        }
    }
0 голосов
/ 07 декабря 2018

2.0 представил ConsumerAwareRebalanceListener (текущая версия 2.2.2).

См. Как проверить ConsumerAwareRebalanceListener? для примера.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...