EmbeddedKafka не вызывает слушателя при использовании схемы avro - PullRequest
0 голосов
/ 08 мая 2020

Я пытался написать простой модульный тест Kafka Listener с использованием KafkaEmbedded. Однако мой слушатель не вызывается. Я использовал эту ссылку для вдохновения, так как мне также нужен Avro Serializer / DeSerializer.

Ниже показано, как выглядит мой тестовый класс.

@ExtendWith(SpringExtension.class)
public class KafkaTest{

    public static final String TOPIC_2 = "topic";

    @Autowired
    private Service listener;

    @Autowired
    private SomeClient someClient;

    @Autowired
    private KafkaTemplate<String, Avro> template;

    @Test
    void testSimple() {
        template.send(TOPIC_2, "test", Avro.newBuilder()
                .setGameProvider("abcd")
                .setMessageName("someMessageName")
                .setRequestId(UUID.randomUUID().toString())
                .setBody(UUID.randomUUID().toString())
                .build());
        verify(someClient).register(anyString(), anyString(), anyString(), anyString());
        template.flush();
    }

    @Configuration
    @EnableKafka
    public static class Config {

        @Bean
        public EmbeddedKafkaBroker kafkaEmbedded() {
            return new EmbeddedKafkaBroker(1, true, 1, TOPIC_2);
        }

        @Bean
        public ConsumerFactory<String, Avro> createConsumerFactory() {
            Map<String, Object> props = new HashMap<>();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaEmbedded().getBrokersAsString());
            props.put(ConsumerConfig.GROUP_ID_CONFIG, "group-1");
            props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, CustomKafkaAvroDeserializer.class);
            props.put("schema.registry.url", "not-used");
            props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
            return new DefaultKafkaConsumerFactory<>(props);
        }

        @Bean
        public ConcurrentKafkaListenerContainerFactory<String, Avro> kafkaListenerContainerFactory() {
            ConcurrentKafkaListenerContainerFactory<String, Avro> factory = new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(createConsumerFactory());
            return factory;
        }

        @MockBean
        private SomeClient client;

        @MockBean
        private Marshaller marshaller;

        @Bean
        public SomeService listener() {
            return new SomeService(client, marshaller, kafkaTemplate());
        }

        @Bean
        public ProducerFactory<String, Avro> producerFactory() {
            Map<String, Object> props = new HashMap<>();
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaEmbedded().getBrokersAsString());
            props.put(ProducerConfig.RETRIES_CONFIG, 1);
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CustomKafkaAvroSerializer.class);
            props.put("schema.registry.url", "not-used");
            return new DefaultKafkaProducerFactory<>(props);
        }

        @Bean
        public KafkaTemplate<String, Avro> kafkaTemplate() {
            return new KafkaTemplate<>(producerFactory());
        }
    }
}

public class CustomKafkaAvroDeserializer extends KafkaAvroDeserializer {
    @Override
    public Object deserialize(String topic, byte[] bytes) {
        if (topic.equals("topic")) {
            this.schemaRegistry = getMockClient(SafeReport.SCHEMA$);
        }
        return super.deserialize(topic, bytes);
    }

    private static SchemaRegistryClient getMockClient(final Schema schema$) {
        return new MockSchemaRegistryClient() {
            @Override
            public synchronized Schema getById(int id) {
                return schema$;
            }
        };
    }
}

public class CustomKafkaAvroSerializer extends KafkaAvroSerializer {
    public CustomKafkaAvroSerializer() {
        super();
        super.schemaRegistry = new MockSchemaRegistryClient();
    }

    public CustomKafkaAvroSerializer(SchemaRegistryClient client) {
        super(new MockSchemaRegistryClient());
    }

    public CustomKafkaAvroSerializer(SchemaRegistryClient client, Map<String, ?> props) {
        super(new MockSchemaRegistryClient(), props);
    }
}

@Service
@EnableBinding(Sink.class)
public class SomeService {

    @KafkaListener(topics = "topic", groupId = "group-1")
    public void listen(ConsumerRecord<String, Avro> cr) {
        System.out.println(String.format("#### -> Consumed message -> %s", cr.toString()));
    }
}

Мой слушатель никогда не вызывается, когда я запускаю этот тест.

...