Как преобразовать JSON, полученное от моего потребителя Spring Kafka, в организацию, готовую для сохранения в банке? - PullRequest
1 голос
/ 20 апреля 2020

Мне нужно использовать JSON из тем Kafka и преобразовывать их в сущности, чтобы я мог сохранить в базе данных Postgres, в случае

Конфигурация потребителя:

@Configuration
@EnableKafka
public class ConsumerConfiguration {
    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> properties = new HashMap<>();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        properties.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "Kafka");
        return properties;
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs(),
                new StringDeserializer(),
                new JsonDeserializer<>(String.class));
    }

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>>
    kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

    @Bean
    public EventConsumer consumer() {
        return new EventConsumer();
    }
}

Класс потребителя :

@Slf4j
@Data
public class EventConsumer {
    private EntidadeComercialRepository entidadeComercialRepository;
    private CountDownLatch latch = new CountDownLatch(1);

    @KafkaListener(topics = "${topic.entidades_comerciais}")
    public void kafkaConsumer(@Payload EntidadesComerciais entidadesComerciais) {
        entidadeComercialRepository.save(entidadesComerciais);
        log.info("received payload='{}'", entidadesComerciais);
        latch.countDown();
    }
}

Класс частичных сущностей

@lombok.Data
@NoArgsConstructor
@AllArgsConstructor
@Entity(name = "ENTIDADES_COMERCIAIS")
@Builder
public class EntidadesComerciais {
    @Id
    @Column(name = "CODIGO_DA_ENTIDADE_COMERCIAL")
    private Long codigoDaEntidadeComercial;

    @Column(name = "NOME_DA_ENTIDADE_COMERCIAL")
    private String nomeDaEntidadeComercial;

    @Column(name = "NOME_COMERCIAL")
    private String nomeComercial;

    @Column(name = "TIPO_DA_ENTIDADE_COMERCIAL")
    private String tipoDaEntidadeComercial;
    ...

Я использую Spring Kafka и Postgres. Когда программа запускается, я получаю это исключение:

org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
Endpoint handler details:
Method [public void br.com.dchristofolli.kafka.EventConsumer.kafkaConsumer(br.com.dchristofolli.kafka.entity.EntidadesComerciais)]
Bean [EventConsumer(entidadeComercialRepository=null, latch=java.util.concurrent.CountDownLatch@144e36ae[Count = 1])]; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] to [br.com.dchristofolli.kafka.entity.EntidadesComerciais] for GenericMessage [payload={"topic":"ENTIDADES_COMERCIAIS"...

Мне нужно знать, что я сделал неправильно, или если мне нужно изменить какую-либо часть моего кода, чтобы иметь возможность преобразовать полезную нагрузку, полученную в моей сущности ENTIDADES_COMERCIAIS Спасибо всем, кто может мне помочь

1 Ответ

1 голос
/ 20 апреля 2020

Невозможно преобразовать из [java .lang.String] в [br.com.dchristofolli.kafka.entity.EntidadesComerciais

Вместо этого вы указываете десериализатору создать строку EntidadesComerciais.

new JsonDeserializer<>(String.class)
...