@KafkaListener не использует сообщения - проблема с десериализацией - PullRequest
3 голосов
/ 19 марта 2019

Генератор сообщений, использующий привязки Кафки из облачных потоков Spring

@Component
public static class PageViewEventSource implements ApplicationRunner {

private final MessageChannel pageViewsOut;
private final Log log = LogFactory.getLog(getClass());

public PageViewEventSource(AnalyticsBinding binding) {
    this.pageViewsOut = binding.pageViewsOut();
}

@Override
public void run(ApplicationArguments args) throws Exception {
    List<String> names = Arrays.asList("priya", "dyser", "Ray", "Mark", "Oman", "Larry");
    List<String> pages = Arrays.asList("blog", "facebook", "instagram", "news", "youtube", "about");
    Runnable runnable = () -> {
        String rPage = pages.get(new Random().nextInt(pages.size()));
        String rName = pages.get(new Random().nextInt(names.size()));
        PageViewEvent pageViewEvent = new PageViewEvent(rName, rPage, Math.random() > .5 ? 10 : 1000);


        Serializer<PageViewEvent> serializer = new JsonSerde<>(PageViewEvent.class).serializer();
        byte[] m = serializer.serialize(null, pageViewEvent);

        Message<byte[]> message =  MessageBuilder
                .withPayload(m).build();

        try {
            this.pageViewsOut.send(message);
            log.info("sent " + message);
        } catch (Exception e) {
            log.error(e);
        }
    };
    Executors.newScheduledThreadPool(1).scheduleAtFixedRate(runnable, 1, 1, TimeUnit.SECONDS);
}

Это использование ниже сериализации

spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde = org.apache.kafka.common.serialization.Serdes $ StringSerde spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde = org.apache.kafka.common.serialization.Serdes $ BytesSerde

Я пытаюсь использовать эти сообщения в отдельном приложении для потребителей через Spring Kafka - KafkaListener

@Service
public class PriceEventConsumer {


private static final Logger LOG = LoggerFactory.getLogger(PriceEventConsumer.class);

@KafkaListener(topics = "test1" , groupId = "json", containerFactory = "kafkaListenerContainerFactory")
public void receive(Bytes data){
//public void receive(@Payload PageViewEvent data,@Headers MessageHeaders headers) {
    LOG.info("Message received");
    LOG.info("received data='{}'", data);

    }

Контейнерная заводская конфигурация

 @Bean
 public ConsumerFactory<String, Bytes> consumerFactory() {

  Map<String, Object> props = new HashMap<>();
  props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
  props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
  StringDeserializer.class);
   props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
  BytesDeserializer.class);
  props.put(ConsumerConfig.GROUP_ID_CONFIG, "json");
  props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

  return new DefaultKafkaConsumerFactory<>(props);

 }

 @Bean
 public ConcurrentKafkaListenerContainerFactory<String, Bytes> 

 kafkaListenerContainerFactory() {

  ConcurrentKafkaListenerContainerFactory<String, Bytes> factory =
        new ConcurrentKafkaListenerContainerFactory<>();
  factory.setConsumerFactory(consumerFactory());
  return factory;
 }

При такой конфигурации потребитель не принимает сообщения (в байтах). Если я изменю слушателя Кафки, чтобы он принимал String, тогда это дает мне исключение:

   @KafkaListener(topics = "test1" , groupId = "json", containerFactory = "kafkaListenerContainerFactory")

 public void receive(String data){

    LOG.info("Message received");
    LOG.info("received data='{}'", data);

    }

Вызывается:

org.springframework.messaging.converter.MessageConversionException: не удается обработать сообщение; вложенным исключением является org.springframework.messaging.converter.MessageConversionException: невозможно преобразовать из [org.apache.kafka.common.utils.Bytes] в [java.lang.String] для GenericMessage [payload = {"userId": «facebook» , "страница": "о", "длительность": 10}, заголовки = {kafka_offset = 4213, kafka_consumer=brave.kafka.clients.TracingConsumer@9a75f94, kafka_timestampType = create_time, kafka_receivedMessageKey = NULL, kafka_receivedPartitionId = 0, kafka_receivedTopic = test1 , kafka_receivedTimestamp = 1553007593670}], failedMessage = GenericMessage [payload = {"userId": "facebook", "page": "about", "duration": 10}, заголовки = {kafka_offset = 4213, kafka_consumer = brave.kafka. clients.TracingConsumer@9a75f94, kafka_timestampType = CREATE_TIME, kafka_receivedMessageKey = null, kafka_receivedPartitionId = 0, kafka_receivedTopic = test1, kafka_receivedTimestamp = 1553009] 157500 ... еще 23 Вызывается: org.springframework.messaging.converter.MessageConversionException: невозможно преобразовать из [org.apache.kafka.common.utils.Bytes] в [java.lang.String] для GenericMessage [payload = {"userId": «facebook» , "страница": "о", "длительность": 10}, заголовки = {kafka_offset = 4213, kafka_consumer=brave.kafka.clients.TracingConsumer@9a75f94, kafka_timestampType = create_time, kafka_receivedMessageKey = NULL, kafka_receivedPartitionId = 0, kafka_receivedTopic = test1 , kafka_receivedTimestamp = 1553007593670}] at org.springframework.messaging.handler.annotation.support.PayloadArgumentResolver.resolveArgument (PayloadArgumentResolver.java:144) ~ [spring-messaging-5.1.4.RELEASE.jar: 5.1.4.RELEASE] at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument (HandlerMethodArgumentResolverComposite.java:117) ~ [spring-messaging-5.1.4.RELEASE.jar: 5.1.4.ELE at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues ​​(InvocableHandlerMethod.java:147) ~ [spring-messaging-5.1.4.RELEASE.jar: 5.1.4.RELEASE] в org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke (InvocableHandlerMethod.java:116) ~ [spring-messaging-5.1.4.RELEASE.jar: 5.1.4.RELEASE] в org.springframework.kafka.listener.adapter.HandlerAdapter.invoke (HandlerAdapter.java:48) ~ [spring-kafka-2.2.3.RELEASE.jar: 2.2.3.RELEASE] at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler (MessagingMessageListenerAdapter.java:283) ~ [spring-kafka-2.2.3.RELEASE.jar: 2.2.3.RELEASE] ... еще 22 * ​​1024 *

Любые указатели будут очень полезны.

Обновление POJO Part

Партия Pojo ——

  @KafkaListener(topics = "test1" , groupId = "json", containerFactory = "kafkaListenerContainerFactory")

  public void receive(@Payload PageViewEvent data,@Headers MessageHeaders headers) {
    LOG.info("Message received");
    LOG.info("received data='{}'", data);

   }

Контейнерная заводская конфигурация

 @Bean
 public ConsumerFactory<String,PageViewEvent > priceEventConsumerFactory() {

    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "json");
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(PageViewEvent.class));



}

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, PageViewEvent> priceEventsKafkaListenerContainerFactory() {
      ConcurrentKafkaListenerContainerFactory<String, PageViewEvent> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
     factory.setConsumerFactory(priceEventConsumerFactory());
     return factory;
     }

Производитель -

  @Override
  public void run(ApplicationArguments args) throws Exception {
  List<String> names = Arrays.asList("priya", "dyser", "Ray", "Mark", "Oman", "Larry");
   List<String> pages = Arrays.asList("blog", "facebook", "instagram", "news", "youtube", "about");
   Runnable runnable = () -> {
    String rPage = pages.get(new Random().nextInt(pages.size()));
    String rName = pages.get(new Random().nextInt(names.size()));
    PageViewEvent pageViewEvent = new PageViewEvent(rName, rPage, Math.random() > .5 ? 10 : 1000);

    Message<PageViewEvent> message =  MessageBuilder
            .withPayload(pageViewEvent).build();
                    try {
        this.pageViewsOut.send(message);
        log.info("sent " + message);
    } catch (Exception e) {
        log.error(e);
    }
};

1 Ответ

0 голосов
/ 19 марта 2019

Вы можете десериализовать запись из kfka в POJO, для версий <2.2.x используйте <code>MessageConverter

Начиная с версии 2.2, вы можете явно настроить десериализатор для использования предоставленного целевого типаи игнорировать информацию о типе в заголовках, используя один из перегруженных конструкторов с логическим значением

@Bean
public ConsumerFactory<String,PageViewEvent > priceEventConsumerFactory() {

Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "json");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(PageViewEvent.class,false));

}

или используя MessageConverter

 @Bean
 public ConcurrentKafkaListenerContainerFactory<String, Bytes> kafkaListenerContainerFactory() {

 ConcurrentKafkaListenerContainerFactory<String, Bytes> factory =
    new ConcurrentKafkaListenerContainerFactory<>();
 factory.setConsumerFactory(consumerFactory());
 factory.setMessageConverter(new StringJsonMessageConverter());
 return factory;
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...