Хотя я могу опоздать с ответом, но это может быть полезно для тех, кто ищет решение. Подробное решение можно посмотреть по адресу https://github.com/CODINGSAINT/kafka-stream-spring
Подумайте, есть ли у нас нестандартный java боб
public class Quote {
private String content;
private Set<String> tags;
private String author;
}
Вы также должны написать Kafka Producer в качестве потребительских конфигураций
/**
* Configurations for KafkaStreams
* @param kafkaProperties Will take defaults from application YAML or Properties file with spring.kafka
* @return kafkaConfiguration
*/
@Bean(name= KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kafkaConfiguration(final KafkaProperties kafkaProperties){
Map<String, Object> config = new HashMap<>();
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
config.put(StreamsConfig.APPLICATION_ID_CONFIG, kafkaProperties.getClientId());
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, QuoteSerde.class.getName() );
config.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler.class);
return new KafkaStreamsConfiguration(config);
}
/**
* The Stream which delegates each incoming topic to respective destination topic
* @param kStreamsBuilder
* @return
*/
@Bean
public KStream<String,Quote> kStream(StreamsBuilder kStreamsBuilder){
KStream<String,Quote> stream=kStreamsBuilder.stream(inputTopic);
for(String topic:allTopics){
stream.filter((s, quote) -> quote.getTags().contains(topic)).to(topic);
}
return stream;
}
/**
* Kafka ConsumerFactory configurations
* @return
*/
@Bean
public ConsumerFactory<String, Quote> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
kafkaProperties.getBootstrapServers());
props.put(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
props.put(
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
BytesDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
/**
* Required Configuration for POJO to JSON
* @return ConcurrentKafkaListenerContainerFactory
*/
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Quote>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Quote> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setMessageConverter(new StringJsonMessageConverter());
return factory;
}
Затем нам потребуется сериализатор
public class QuoteSerializer implements Serializer<Quote> {
@Override
public byte[] serialize(String s, Quote quote) {
byte[] retVal = null;
ObjectMapper objectMapper = new ObjectMapper();
try {
retVal = objectMapper.writeValueAsString(quote).getBytes();
} catch (Exception e) {
e.printStackTrace();
}
return retVal;
}
}
и десериализатор
public class QuoteDeserializer implements Deserializer<Quote> {
@Override
public Quote deserialize(String s, byte[] bytes) {
ObjectMapper mapper = new ObjectMapper();
Quote quote = null;
try {
quote = mapper.readValue(bytes, Quote.class);
} catch (Exception e) {
e.printStackTrace();
}
return quote;
}
}
Для использования как сериализатора, так и десериализатора Serde
public class QuoteSerde implements Serde<Quote> {
public QuoteSerde() {
}
@Override
public Serializer<Quote> serializer() {
return new QuoteSerializer();
}
@Override
public Deserializer<Quote> deserializer() {
return new QuoteDeserializer();
}
}
Теперь наш слушатель может слушать
@Component
public class TopicConsumers {
private static final Logger LOGGER = LoggerFactory.getLogger(TopicConsumers.class);
@Value("#{'${kafka.topic.output}'.split(',')}")
private List<String> allTopics;
/**
* For simplicity we are listening all topics at one listener
*/
@KafkaListener(id = "allTopics", topics = "#{'${kafka.topic.output}'.split(',')}",
containerFactory = "kafkaListenerContainerFactory")
public void consume(@Payload Quote quote,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.RECEIVED_TOPIC) String incomingTopic,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts
) {
LOGGER.info("Incoming quote {}-> {}", incomingTopic, quote);
}
}
Ниже приведен файл application.yml
spring:
kafka:
listener:
missing-topics-fatal: false
client-id : quotes-app
bootstrap-server:
- localhost:9091
- localhost:9001
- localhost:9092
template:
default-topic: quotes
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: com.codingsaint.learning.kafkastreamspring.QuoteSerializer
consumer:
properties:
partition:
assignment:
strategy: org.apache.kafka.clients.consumer.RoundRobinAssignor
group-id: random-consumer
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: com.codingsaint.learning.kafkastreamspring.QuoteDeserializer
---
kafka:
topic:
input: quotes
output: business,education,faith,famous-quotes,friendship,future,happiness,inspirational,life,love,nature,politics,proverb,religion,science,success,technology