Spring Boot Kafka newb ie вопрос по сериализации / десериализации - PullRequest
0 голосов
/ 01 февраля 2020

Я новичок в Kafka (использую Spring Boot 2.2.4), и я вижу примеры, где KafkaTemplate это String, String, чтобы просто отправить строку. Я смотрел на отправку Json объектов, и я вижу там 2 разных подхода ... некоторые люди используют String, Object, а некоторые используют String, TheActualModelClass.

Есть ли плюсы / минусы между двумя ? Я предпочитаю предположить, что основная разница состоит в том, что типизированный шаблон будет полезен только для одной модели, тогда как объект может отправлять любой тип на любой topi c? Что-нибудь кроме этого?

Ответы [ 3 ]

0 голосов
/ 01 февраля 2020

При использовании библиотеки spring-kafka я бы предложил использовать пружины JsonSerializer и JsonDeserializer, чтобы избежать большого количества кода пластины котла, вы можете найти более подробную информацию о Сериализации, десериализации и преобразовании сообщений

Apache Кафка предоставляет только высокоуровневый API для сериализации и десериализации, поэтому пользователю нужна настраиваемая реализация для сериализации или десериализации

org.apache.kafka.common.serialization.Serializer<T>
org.apache.kafka.common.serialization.Deserializer<T>

Apache Кафка обеспечивает высокую API уровня для сериализации и десериализации значений записей, а также их ключей. Он присутствует в абстракциях org. apache .kafka.common.serialization.Serializer и org. apache .kafka.common.serialization.Deserializer с некоторыми встроенными реализациями. Между тем, мы можем указать классы сериализатора и десериализатора, используя свойства конфигурации Producer или Consumer.

Но Spring-kafka обеспечивает JsonSerializer и JsonDeserializer на основе ObjectMapper

Spring для Apache Kafka также предоставляет реализации JsonSerializer и JsonDeserializer, основанные на объектном преобразователе Jackson JSON. JsonSerializer позволяет записывать любой объект Java в виде байта JSON []. JsonDeserializer требует дополнительный аргумент Class targetType, чтобы разрешить десериализацию потребляемого байта [] в соответствующий целевой объект.

А также обеспечивает десериализацию различных типов JSON объектов для соответствующих java классов POJO, используя Отображения типов и @ KafkaListener для класса

0 голосов
/ 10 марта 2020

Хотя я могу опоздать с ответом, но это может быть полезно для тех, кто ищет решение. Подробное решение можно посмотреть по адресу https://github.com/CODINGSAINT/kafka-stream-spring

Подумайте, есть ли у нас нестандартный java боб

public class Quote {
private String content;
private Set<String> tags;
private String author;
}

Вы также должны написать Kafka Producer в качестве потребительских конфигураций

 /**
 * Configurations for KafkaStreams
 * @param kafkaProperties Will take defaults from application YAML or Properties file with spring.kafka
 * @return kafkaConfiguration
 */
@Bean(name= KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kafkaConfiguration(final KafkaProperties kafkaProperties){
    Map<String, Object> config = new HashMap<>();
    config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
    config.put(StreamsConfig.APPLICATION_ID_CONFIG, kafkaProperties.getClientId());
    config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, QuoteSerde.class.getName() );
    config.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler.class);
    return new KafkaStreamsConfiguration(config);
}

/**
 * The Stream which delegates each incoming topic to respective destination topic
 * @param kStreamsBuilder
 * @return
 */
@Bean
public KStream<String,Quote> kStream(StreamsBuilder kStreamsBuilder){
    KStream<String,Quote> stream=kStreamsBuilder.stream(inputTopic);
    for(String topic:allTopics){
        stream.filter((s, quote) -> quote.getTags().contains(topic)).to(topic);
    }
    return stream;

}

/**
 * Kafka ConsumerFactory configurations
 * @return
 */
@Bean
public ConsumerFactory<String, Quote> consumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(
            ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
            kafkaProperties.getBootstrapServers());
    props.put(
            ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
            StringDeserializer.class);
    props.put(
            ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
            BytesDeserializer.class);
    return new DefaultKafkaConsumerFactory<>(props);
}

/**
 * Required Configuration for POJO to JSON
 * @return ConcurrentKafkaListenerContainerFactory
 */
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Quote>
kafkaListenerContainerFactory() {

    ConcurrentKafkaListenerContainerFactory<String, Quote> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setMessageConverter(new StringJsonMessageConverter());
    return factory;
}

Затем нам потребуется сериализатор

public class QuoteSerializer implements Serializer<Quote> {

    @Override
    public byte[] serialize(String s, Quote quote) {
        byte[] retVal = null;
        ObjectMapper objectMapper = new ObjectMapper();
        try {
            retVal = objectMapper.writeValueAsString(quote).getBytes();
        } catch (Exception e) {
            e.printStackTrace();
        }
        return retVal;
    }
}

и десериализатор

public class QuoteDeserializer implements Deserializer<Quote> {

    @Override
    public Quote deserialize(String s, byte[] bytes) {
        ObjectMapper mapper = new ObjectMapper();
        Quote quote = null;
        try {
            quote = mapper.readValue(bytes, Quote.class);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return quote;
    }
}

Для использования как сериализатора, так и десериализатора Serde

public class QuoteSerde implements Serde<Quote> {
    public QuoteSerde() {
    }

    @Override
    public Serializer<Quote> serializer() {
        return new QuoteSerializer();
    }

    @Override
    public Deserializer<Quote> deserializer() {
        return new QuoteDeserializer();
    }
}

Теперь наш слушатель может слушать

@Component
public class TopicConsumers {
    private static final Logger LOGGER = LoggerFactory.getLogger(TopicConsumers.class);

    @Value("#{'${kafka.topic.output}'.split(',')}")
    private List<String> allTopics;

    /**
     * For simplicity we are listening all topics at one listener
     */

    @KafkaListener(id = "allTopics", topics = "#{'${kafka.topic.output}'.split(',')}",
            containerFactory = "kafkaListenerContainerFactory")
    public void consume(@Payload Quote quote,
                        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
                        @Header(KafkaHeaders.RECEIVED_TOPIC) String incomingTopic,
                        @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts
    ) {
        LOGGER.info("Incoming quote {}-> {}", incomingTopic, quote);
    }
}

Ниже приведен файл application.yml

spring:
  kafka:
    listener:
      missing-topics-fatal: false
    client-id : quotes-app
    bootstrap-server:
      - localhost:9091
      - localhost:9001
      - localhost:9092
    template:
      default-topic: quotes
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: com.codingsaint.learning.kafkastreamspring.QuoteSerializer
    consumer:
      properties:
        partition:
          assignment:
            strategy: org.apache.kafka.clients.consumer.RoundRobinAssignor
      group-id: random-consumer
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: com.codingsaint.learning.kafkastreamspring.QuoteDeserializer
---
kafka:
  topic:
    input: quotes
    output: business,education,faith,famous-quotes,friendship,future,happiness,inspirational,life,love,nature,politics,proverb,religion,science,success,technology
0 голосов
/ 01 февраля 2020

JSON это просто строка. В конце кодовой последовательности ваш объект модели все равно будет сериализован в байты

Какой вы предпочитаете, зависит от того, сколько кода сериализатора Kafka вы хотите абстрагировать

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...