Вот пример того, как использовать сообщения в пакетном режиме.
Пример демонстрации Kafka для Batch listener с авро-сообщением formart
Приложение имеет пользовательское сообщениеКонвертор, который преобразует сообщения Avro в Pojo напрямую. Он использует файлы схемы в classPath.Файлы схемы именуются по условию "topicName" .avsc
public class AvroSchemaMessageConverter extends MessagingMessageConverter {
private AvroMapper avroMapper;
private SchemaRegistry schemaRegistry;
private KafkaHeaderMapper headerMapper;
public AvroSchemaMessageConverter(AvroMapper avroMapper, SchemaRegistry schemaRegistry) {
this.avroMapper = avroMapper;
this.schemaRegistry = schemaRegistry;
if (JacksonPresent.isJackson2Present()) {
this.headerMapper = new DefaultKafkaHeaderMapper();
} else {
this.headerMapper = new SimpleKafkaHeaderMapper();
}
}
@Override
protected Object extractAndConvertValue(ConsumerRecord<?, ?> record, Type type) {
System.out.printf(record.value().getClass().getName());
ByteBuffer buffer = ByteBuffer.wrap((byte[])record.value());
JavaType javaType = TypeFactory.defaultInstance().constructType(type);
try {
return avroMapper.readerFor(javaType).with(schemaRegistry.getAvroSchema(record.topic()))
.readValue(buffer.array(), buffer.arrayOffset(), buffer.limit());
} catch (IOException e) {
throw new ConversionException("Failed to convert AvroMessage", e);
}
}
@Override
public ProducerRecord<?, ?> fromMessage(Message<?> message, String defaultTopic) {
MessageHeaders headers = message.getHeaders();
Object topicHeader = headers.get(KafkaHeaders.TOPIC);
String topic = null;
if (topicHeader instanceof byte[]) {
topic = new String(((byte[]) topicHeader), StandardCharsets.UTF_8);
} else if (topicHeader instanceof String) {
topic = (String) topicHeader;
} else if (topicHeader == null) {
Assert.state(defaultTopic != null, "With no topic header, a defaultTopic is required");
} else {
throw new IllegalStateException(KafkaHeaders.TOPIC + " must be a String or byte[], not "
+ topicHeader.getClass());
}
String actualTopic = topic == null ? defaultTopic : topic;
Integer partition = headers.get(KafkaHeaders.PARTITION_ID, Integer.class);
Object key = headers.get(KafkaHeaders.MESSAGE_KEY);
Object payload = convertPayload(message, topic);
Long timestamp = headers.get(KafkaHeaders.TIMESTAMP, Long.class);
Headers recordHeaders = initialRecordHeaders(message);
if (this.headerMapper != null) {
this.headerMapper.fromHeaders(headers, recordHeaders);
}
return new ProducerRecord(topic == null ? defaultTopic : topic, partition, timestamp, key,
payload,
recordHeaders);
}
protected Object convertPayload(Message<?> message, String topic) {
try {
return avroMapper.writer(schemaRegistry.getAvroSchema(topic))
.writeValueAsBytes(message.getPayload());
} catch (JsonProcessingException e) {
throw new ConversionException("Failed to convert object to AvroMessage", e);
}
}
Вот как нам нужно настроить ConsumerFactory и KafkaListenerContainerFactory:
@Configuration
@EnableKafka
public class KafkaConfiguration {
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Object>>
kafkaListenerContainerFactory(ConsumerFactory<String, Object> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, Object> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.setBatchListener(true); // This is needed for batch listener
factory.setMessageConverter(new BatchMessagingMessageConverter(converter()));
factory.getContainerProperties().setAckMode(AckMode.MANUAL);
return factory;
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate(ProducerFactory<String, Object> producerFactory) {
KafkaTemplate kafkaTemplate = new KafkaTemplate<String, Object>(producerFactory);
kafkaTemplate.setMessageConverter(converter());
return kafkaTemplate;
}
@Bean
public RecordMessageConverter converter() {
return new AvroSchemaMessageConverter(avroMapper(), schemaRegistry());
}
@Bean
public SchemaRegistry schemaRegistry() {
return new SchemaRegistry();
}
@Bean
public AvroMapper avroMapper() {
AvroMapper mapper = new AvroMapper();
mapper.configure(Feature.IGNORE_UNKNOWN, true);
mapper.setSerializationInclusion(Include.NON_NULL);
mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
return mapper;
}
}