Как проверить ConsumerAwareRebalanceListener? - PullRequest
0 голосов
/ 24 октября 2018

Я разработал @KafkaListener, который также помечен интерфейсом ConsumerAwareRebalanceListener, используя Spring Boot 2.0.6.Я реализовал метод onPartitionsAssigned, в котором я перематывал смещение на фиксированное количество времени, скажем, 60 секунд.

Пока все хорошо.

Как я могу протестировать приведенный выше вариант использования, используя инструменты, которые предоставляет мне Spring Kafka?Я предположил, что мне нужно запустить брокер Kafka (т. Е. EmbeddedKafka), затем остановить слушатель и снова перезагрузить его, чтобы проверить, что он снова читает сообщения, полученные за последние 60 секунд.

Можеткто-нибудь, помогите мне?Я немного погуглил, но ничего не нашел.Большое спасибо.

Ответы [ 2 ]

0 голосов
/ 24 октября 2018
public class MyRebalanceListener implements ConsumerAwareRebalanceListener {

    @Override
    public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
        long rewindTo = System.currentTimeMillis() - 60000;
        Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes = consumer.offsetsForTimes(partitions.stream()
                .collect(Collectors.toMap(tp -> tp, tp -> rewindTo)));
        offsetsForTimes.forEach((k, v) -> consumer.seek(k, v.offset()));
    }

}

и

@RunWith(SpringRunner.class)
@SpringBootTest
public class So52973119ApplicationTests {

    @Test
    public void rebalanceListenerTests() {
        MyRebalanceListener listener = new MyRebalanceListener();
        Consumer<?, ?> consumer = mock(Consumer.class);
        AtomicLong expected = new AtomicLong(System.currentTimeMillis() - 60_000);
        given(consumer.offsetsForTimes(anyMap())).willAnswer(i -> {
            AtomicLong offset = new AtomicLong();
            Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes = new HashMap<>();
            Map<TopicPartition, Long> argument = i.getArgument(0);
            argument.forEach((k, v) -> {
                offsetsForTimes.put(k, new OffsetAndTimestamp(offset.incrementAndGet(), 0L));
                assertThat(v).isBetween(expected.get(), expected.get() + 1_000);
            });
            return offsetsForTimes ;
        });
        TopicPartition t1 = new TopicPartition("foo", 0);
        TopicPartition t2 = new TopicPartition("foo", 1);
        List<TopicPartition> partitions = new ArrayList<>();
        partitions.add(t1);
        partitions.add(t2);
        listener.onPartitionsAssigned(consumer, partitions);
        verify(consumer).seek(t1, 1);
        verify(consumer).seek(t2, 2);
    }

}
0 голосов
/ 24 октября 2018

@KafkaListener имеет атрибут:

/**
 * The unique identifier of the container managing for this endpoint.
 * <p>If none is specified an auto-generated one is provided.
 * @return the {@code id} for the container managing for this endpoint.
 * @see org.springframework.kafka.config.KafkaListenerEndpointRegistry#getListenerContainer(String)
 */
String id() default "";

, так что вы можете получить доступ к его MessageListenerContainer через упомянутый KafkaListenerEndpointRegistry, который вы можете просто @Autowired в тестовый класс на основена Spring Testing Framework.Тогда вы действительно можете stop() и start(), что MessageListenerContainer в вашем методе тестирования.

Также обратите внимание, как @KafkaListener имеет также атрибут autoStartup().

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...