Я действительно изо всех сил пытаюсь написать тест, чтобы проверить, правильно ли вызывается мой Kafka Consumer при отправке сообщений в назначенную ему тему.
Мой потребитель:
@Service
@Slf4j
@AllArgsConstructor(onConstructor = @__(@Autowired))
public class ProcessingConsumer {
private AppService appService;
@KafkaListener(
topics = "${topic}",
containerFactory = "processingConsumerContainerFactory")
public void listen(ConsumerRecord<Key, Value> message, Acknowledgment ack) {
try {
appService.processMessage(message);
ack.acknowledge();
} catch (Throwable t) {
log.error("error while processing message!", t);
}
}
}
Мой потребительский конфиг:
@EnableKafka
@Configuration
public class ProcessingCosumerConfig {
@Value("${spring.kafka.schema-registry-url}")
private String schemaRegistryUrl;
private KafkaProperties props;
public ProcessingCosumerConfig(KafkaProperties kafkaProperties) {
this.props = kafkaProperties;
}
public Map<String, Object> deserializerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
props.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
return props;
}
private KafkaAvroDeserializer getKafkaAvroDeserializer(Boolean isKey) {
KafkaAvroDeserializer kafkaAvroDeserializer = new KafkaAvroDeserializer();
kafkaAvroDeserializer.configure(deserializerConfigs(), isKey);
return kafkaAvroDeserializer;
}
private DefaultKafkaConsumerFactory consumerFactory() {
return new DefaultKafkaConsumerFactory<>(
props.buildConsumerProperties(),
getKafkaAvroDeserializer(true),
getKafkaAvroDeserializer(false));
}
@Bean(name = "processingConsumerContainerFactory")
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Key, Value>>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Key, Value>
factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckOnError(false);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
factory.setErrorHandler(new SeekToCurrentErrorHandler());
return factory;
}
}
Наконец, мой (подражательный) тест:
@DirtiesContext
public class ProcessingConsumerTest extends BaseIntegrationTest{
@Autowired private ProcessingProducerFixture processingProducer;
@Autowired private ProcessingConsumer processingConsumer;
@org.springframework.beans.factory.annotation.Value("${topic}")
String topic;
@Test
public void consumer_shouldConsumeMessages_whenMessagesAreSent() throws Exception{
Thread.sleep(1000);
ProducerRecord<Key, Value> message = new ProducerRecord<>(topic, new Key("b"), new Value("a", "b", "c", "d"));
processingProducer.send(message);
}
}
И этооб этом у меня пока все.Я попытался проверить, дойдет ли этот подход до потребителя вручную с помощью отладки, а также даже просто разместить простые отпечатки, но выполнение, похоже, просто не доходит.Кроме того, если мои тесты могут как-то правильно вызываться, я понятия не имею, что делать, чтобы фактически утверждать это в реальном тесте.