Как проверить, вызывается ли метод с @KafkaListener - PullRequest
1 голос
/ 31 мая 2019

Я действительно изо всех сил пытаюсь написать тест, чтобы проверить, правильно ли вызывается мой Kafka Consumer при отправке сообщений в назначенную ему тему.

Мой потребитель:

@Service
@Slf4j
@AllArgsConstructor(onConstructor = @__(@Autowired))
public class ProcessingConsumer {

  private AppService appService;

  @KafkaListener(
      topics = "${topic}",
      containerFactory = "processingConsumerContainerFactory")
  public void listen(ConsumerRecord<Key, Value> message, Acknowledgment ack) {
    try {
      appService.processMessage(message);
      ack.acknowledge();
    } catch (Throwable t) {
      log.error("error while processing message!", t);
    }
  }
}

Мой потребительский конфиг:

@EnableKafka
@Configuration
public class ProcessingCosumerConfig {

  @Value("${spring.kafka.schema-registry-url}")
  private String schemaRegistryUrl;

  private KafkaProperties props;

  public ProcessingCosumerConfig(KafkaProperties kafkaProperties) {
    this.props = kafkaProperties;
  }

  public Map<String, Object> deserializerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
    props.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);

    return props;
  }

  private KafkaAvroDeserializer getKafkaAvroDeserializer(Boolean isKey) {
    KafkaAvroDeserializer kafkaAvroDeserializer = new KafkaAvroDeserializer();
    kafkaAvroDeserializer.configure(deserializerConfigs(), isKey);
    return kafkaAvroDeserializer;
  }

  private DefaultKafkaConsumerFactory consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(
        props.buildConsumerProperties(),
        getKafkaAvroDeserializer(true),
        getKafkaAvroDeserializer(false));
  }

  @Bean(name = "processingConsumerContainerFactory")
  public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Key, Value>>
      kafkaListenerContainerFactory() {

    ConcurrentKafkaListenerContainerFactory<Key, Value>
        factory = new ConcurrentKafkaListenerContainerFactory<>();

    factory.setConsumerFactory(consumerFactory());
    factory.getContainerProperties().setAckOnError(false);
    factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
    factory.setErrorHandler(new SeekToCurrentErrorHandler());
    return factory;
  }
}

Наконец, мой (подражательный) тест:

@DirtiesContext
public class ProcessingConsumerTest extends BaseIntegrationTest{

  @Autowired private ProcessingProducerFixture processingProducer;
  @Autowired private ProcessingConsumer processingConsumer;
  @org.springframework.beans.factory.annotation.Value("${topic}")
  String topic;

  @Test
  public void consumer_shouldConsumeMessages_whenMessagesAreSent() throws Exception{
    Thread.sleep(1000);
    ProducerRecord<Key, Value> message = new ProducerRecord<>(topic, new Key("b"), new Value("a", "b", "c", "d"));
    processingProducer.send(message);
  }
}

И этооб этом у меня пока все.Я попытался проверить, дойдет ли этот подход до потребителя вручную с помощью отладки, а также даже просто разместить простые отпечатки, но выполнение, похоже, просто не доходит.Кроме того, если мои тесты могут как-то правильно вызываться, я понятия не имею, что делать, чтобы фактически утверждать это в реальном тесте.

1 Ответ

0 голосов
/ 31 мая 2019

Вставьте фиктивный AppService в слушатель и убедитесь, что его processMessage () был вызван.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...