Отправьте полезную нагрузку json в тему Apache Kafka весной - PullRequest
0 голосов
/ 20 ноября 2018

Мне нужно отправить (Post) полезную нагрузку Json в тему Apache Kafka, но я получаю следующую ошибку: - "message": "Не удается преобразовать значение класса com.xyz.User в классorg.apache.kafka.common.serialization.StringSerializer, указанный в value.serializer "

Также Spring показывает исключение приведения класса: - java.lang.ClassCastException: com.xyz.User не может быть приведен к java.lang.String

Ниже приводится мой модал, конфигурация kafka и контроллер

public class User {
    private String firstname;
    private String email;


    public User() {}
    public User(String firstname, String email) {
        super();
        this.firstname = firstname;
        this.email = email;
    }
    public String getFirstname() {
        return firstname;
    }
    public void setFirstname(String firstname) {
        this.firstname = firstname;
    }
    public String getEmail() {
        return email;
    }
    public void setEmail(String email) {
        this.email = email;
    }
    @Override
    public String toString() {
        return "UserModel [firstname=" + firstname + ", email=" + email + "]";
    }


}

Конфигурация Kafka

 package config;

import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;

import com.xyz.User;

@Configuration
public class KafkaConfiguration {

    @Bean
    public ProducerFactory<String, User> producerFactory(){
        Map<String, Object> config = new HashMap<>();

        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

        return new DefaultKafkaProducerFactory<>(config);

    }
     @Bean
        public KafkaTemplate<String, User> kafkaTemplate() {
            return new KafkaTemplate<>(producerFactory());
        }

}

Контроллер

@RestController
@RequestMapping("/kafka")
public class UserController {
    @Autowired
    private KafkaTemplate<String, User> kafkaTemplate;

    private static String TOPIC = "kafka-producer";
    @PostMapping("/publish")
    public void getUserId(@RequestBody User user) {

        kafkaTemplate.send(TOPIC, user);

Json Payload отправлено почтальоном

{
    "firstname" : "xyz",
    "email" : "xyz@gmail.com.com"

}

Ответы [ 3 ]

0 голосов
/ 20 ноября 2018

У меня сработало следующее.

public class UserController {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

private static String TOPIC = "kafka-producer";
@PostMapping("/publish")
public void getUserId(@RequestBody User user) {
    ObjectWriter ow = new ObjectMapper().writer().withDefaultPrettyPrinter();
    String json = null;
    try {
        json = ow.writeValueAsString(user);
    } catch (JsonProcessingException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }   
kafkaTemplate.send(TOPIC, json);


}
0 голосов
/ 20 ноября 2018

Возможность воспроизвести вашу ошибку при моей локальной настройке.

Caused by: org.apache.kafka.common.errors.SerializationException: Can't convert value of class com.example.demo.User to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer
Caused by: java.lang.ClassCastException: com.example.demo.User cannot be cast to java.lang.String

Если мы посмотрим на журнал, то он говорит, что value.serializer значение по-прежнему относится к значению по умолчанию StringSerializer, а не к ожидаемому значению JsonSerializer, что вTurn говорит, что ваша конфигурация производителя не вступает в силу.Короче говоря, ваш класс KafkaCOnfiguration не передается.

Ваш класс KafkaConfiguration находится в некотором пакете config, а класс User - в некотором пакете com.xyz.Таким образом, решение заключается в том, чтобы убедиться, что он получает вашу конфигурацию.Скорее всего, этот пакет может не сканироваться на предмет определения конфигурации / компонентов.Если вы переместите KafkaConfiguration в корневой пакет вашего приложения, тогда ваш оригинальный код должен работать нормально.

Если вы говорите, что ваш объект KafkaTemplate вводится, то на самом деле это не так.Тот, который вводится, определяется автоконфигурацией Spring kafka.

0 голосов
/ 20 ноября 2018

Перед отправкой в ​​KafkaTemplate преобразуйте ваш объект User в строку String.И преобразовать String в объект User на стороне потребителя.

Производитель:

@PostMapping("/publish")
public void getUserId(@RequestBody User user) {
    kafkaTemplate.send(TOPIC, new ObjectMapper().writeValueAsString(user));
}

Потребитель:

User user = new ObjectMapper().readValue(kafkaMessage, User.class);
...