Spring-Kafka не может преобразовать AVRO GenericData.Record в подтверждение - PullRequest
0 голосов
/ 08 февраля 2019

Используя Spring Boot, я пытаюсь настроить своих потребителей Kafka в режиме пакетного приема:

@Bean
public ConcurrentKafkaListenerContainerFactory<String, GenericData.Record> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, GenericData.Record> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setMessageConverter(new StringJsonMessageConverter()); // I know this one won't work
    factory.setBatchListener(true);
    return factory;
}

@Bean
public ConsumerFactory<GenericData.Record, GenericData.Record> consumerFactory() {
    Map<String, Object> dataRiverProps = getDataRiverProps();
    dataRiverProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, env.getProperty("bootstrap.servers"));
    return new DefaultKafkaConsumerFactory<>(dataRiverProps);
}

И вот как выглядит фактический потребитель:

@KafkaListener(topics = "#{'${kafka.topics}'.split(',')}", containerFactory = 'kafkaListenerContainerFactory')
public void consumeAvro(List<GenericData.Record> list, Acknowledgment ack) {
    messageProcessor.addMessageBatchToExecutor(list);
    while (messageProcessor.getTaskSize() > EXECUTOR_TASK_COUNT_THRESHOLD) {
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            LOGGER_ERROR.error(ExceptionUtils.getStackTrace(e.getCause()));
        }
    }
}

исключения, которые я получаю, выглядят так:

nested exception is org.springframework.core.convert.ConverterNotFoundException: No converter found capable of converting from type [org.apache.avro.generic.GenericData$Record] to type [org.springframework.kafka.support.Acknowledgment]
        at org.springframework.core.convert.support.ConversionUtils.invokeConverter(ConversionUtils.java:46)
        at org.springframework.core.convert.support.GenericConversionService.convert(GenericConversionService.java:191)
        at org.springframework.core.convert.support.GenericConversionService.convert(GenericConversionService.java:174)
        at org.springframework.messaging.converter.GenericMessageConverter.fromMessage(GenericMessageConverter.java:66)

Сообщения Kafka являются сообщениями AVRO, и я хотел бы получить их в виде строк JSON.Существует ли готовый для использования конвертер AVRO для GenericData.Record, который я могу подключить к ConcurrentKafkaListenerContainerFactory?Спасибо!

1 Ответ

0 голосов
/ 19 февраля 2019

Вот пример того, как использовать сообщения в пакетном режиме.

Пример демонстрации Kafka для Batch listener с авро-сообщением formart

Приложение имеет пользовательское сообщениеКонвертор, который преобразует сообщения Avro в Pojo напрямую. Он использует файлы схемы в classPath.Файлы схемы именуются по условию "topicName" .avsc

public class AvroSchemaMessageConverter extends MessagingMessageConverter {

  private AvroMapper avroMapper;
  private SchemaRegistry schemaRegistry;
  private KafkaHeaderMapper headerMapper;


  public AvroSchemaMessageConverter(AvroMapper avroMapper, SchemaRegistry schemaRegistry) {
    this.avroMapper = avroMapper;
    this.schemaRegistry = schemaRegistry;
    if (JacksonPresent.isJackson2Present()) {
      this.headerMapper = new DefaultKafkaHeaderMapper();
    } else {
      this.headerMapper = new SimpleKafkaHeaderMapper();
    }
  }

  @Override
  protected Object extractAndConvertValue(ConsumerRecord<?, ?> record, Type type) {
    System.out.printf(record.value().getClass().getName());
    ByteBuffer buffer = ByteBuffer.wrap((byte[])record.value());
    JavaType javaType = TypeFactory.defaultInstance().constructType(type);
    try {
      return avroMapper.readerFor(javaType).with(schemaRegistry.getAvroSchema(record.topic()))
        .readValue(buffer.array(), buffer.arrayOffset(), buffer.limit());
    } catch (IOException e) {
      throw new ConversionException("Failed to convert AvroMessage", e);
    }
  }

  @Override
  public ProducerRecord<?, ?> fromMessage(Message<?> message, String defaultTopic) {
    MessageHeaders headers = message.getHeaders();
    Object topicHeader = headers.get(KafkaHeaders.TOPIC);
    String topic = null;
    if (topicHeader instanceof byte[]) {
      topic = new String(((byte[]) topicHeader), StandardCharsets.UTF_8);
    } else if (topicHeader instanceof String) {
      topic = (String) topicHeader;
    } else if (topicHeader == null) {
      Assert.state(defaultTopic != null, "With no topic header, a defaultTopic is required");
    } else {
      throw new IllegalStateException(KafkaHeaders.TOPIC + " must be a String or byte[], not "
        + topicHeader.getClass());
    }
    String actualTopic = topic == null ? defaultTopic : topic;
    Integer partition = headers.get(KafkaHeaders.PARTITION_ID, Integer.class);
    Object key = headers.get(KafkaHeaders.MESSAGE_KEY);
    Object payload = convertPayload(message, topic);
    Long timestamp = headers.get(KafkaHeaders.TIMESTAMP, Long.class);
    Headers recordHeaders = initialRecordHeaders(message);
    if (this.headerMapper != null) {
      this.headerMapper.fromHeaders(headers, recordHeaders);
    }
    return new ProducerRecord(topic == null ? defaultTopic : topic, partition, timestamp, key,
      payload,
      recordHeaders);
  }

  protected Object convertPayload(Message<?> message, String topic) {
    try {
      return avroMapper.writer(schemaRegistry.getAvroSchema(topic))
        .writeValueAsBytes(message.getPayload());
    } catch (JsonProcessingException e) {
      throw new ConversionException("Failed to convert object to AvroMessage", e);
    }
  }

Вот как нам нужно настроить ConsumerFactory и KafkaListenerContainerFactory:

@Configuration
@EnableKafka
public class KafkaConfiguration {

  @Bean
  public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Object>>
  kafkaListenerContainerFactory(ConsumerFactory<String, Object> consumerFactory) {
    ConcurrentKafkaListenerContainerFactory<String, Object> factory =
      new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory);
    factory.setBatchListener(true); // This is needed for batch listener
    factory.setMessageConverter(new BatchMessagingMessageConverter(converter()));
    factory.getContainerProperties().setAckMode(AckMode.MANUAL);
    return factory;

  }

  @Bean
  public KafkaTemplate<String, Object> kafkaTemplate(ProducerFactory<String, Object> producerFactory) {
    KafkaTemplate kafkaTemplate = new KafkaTemplate<String, Object>(producerFactory);
    kafkaTemplate.setMessageConverter(converter());
    return kafkaTemplate;
  }

  @Bean
  public RecordMessageConverter converter() {
    return new AvroSchemaMessageConverter(avroMapper(), schemaRegistry());
  }

  @Bean
  public SchemaRegistry schemaRegistry() {
    return new SchemaRegistry();
  }

  @Bean
  public AvroMapper avroMapper() {
    AvroMapper mapper = new AvroMapper();
    mapper.configure(Feature.IGNORE_UNKNOWN, true);
    mapper.setSerializationInclusion(Include.NON_NULL);
    mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
    return mapper;
  }

}
...