Вызывается: org. apache .kafka.common.errors.SerializationException: Невозможно десериализовать данные [ - PullRequest
0 голосов
/ 19 апреля 2020

Я работаю над Spring Batch и Apache Kafka Stream . Вдохновение от: https://www.youtube.com/watch?v=UJesCn731G4. В этом примере я просто пытаюсь прочитать поток клиентов, желающих записать эти данные в CSV.

Ошибка:

org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition customers-0 at offset 0. If needed, please seek past the record to continue consumption.
Caused by: org.apache.kafka.common.errors.SerializationException: Can't deserialize data [[123, 34, 105, 100, 34, 58, 49, 44, 34, 102, 105, 114, 115, 116, 78, 97, 109, 101, 34, 58, 34, 32, 74, 111, 104, 110, 34, 44, 34, 108, 97, 115, 116, 78, 97, 109, 101, 34, 58, 34, 32, 68, 111, 101, 34, 44, 34, 98, 105, 114, 116, 104, 100, 97, 116, 101, 34, 58, 123, 34, 109, 111, 110, 116, 104, 34, 58, 34, 79, 67, 84, 79, 66, 69, 82, 34, 44, 34, 121, 101, 97, 114, 34, 58, 49, 57, 53, 50, 44, 34, 100, 97, 121, 79, 102, 89, 101, 97, 114, 34, 58, 50, 56, 52, 44, 34, 100, 97, 121, 79, 102, 77, 111, 110, 116, 104, 34, 58, 49, 48, 44, 34, 100, 97, 121, 79, 102, 87, 101, 101, 107, 34, 58, 34, 70, 82, 73, 68, 65, 89, 34, 44, 34, 104, 111, 117, 114, 34, 58, 49, 48, 44, 34, 109, 105, 110, 117, 116, 101, 34, 58, 49, 48, 44, 34, 109, 111, 110, 116, 104, 86, 97, 108, 117, 101, 34, 58, 49, 48, 44, 34, 110, 97, 110, 111, 34, 58, 48, 44, 34, 115, 101, 99, 111, 110, 100, 34, 58, 49, 48, 44, 34, 99, 104, 114, 111, 110, 111, 108, 111, 103, 121, 34, 58, 123, 34, 105, 100, 34, 58, 34, 73, 83, 79, 34, 44, 34, 99, 97, 108, 101, 110, 100, 97, 114, 84, 121, 112, 101, 34, 58, 34, 105, 115, 111, 56, 54, 48, 49, 34, 125, 125, 125]] from topic [customers]
Caused by: com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Cannot construct instance of `java.time.LocalDateTime` (no Creators, like default construct, exist): cannot deserialize from Object value (no delegate- or property-based Creator)
 at [Source: (byte[])"{"id":1,"firstName":" John","lastName":" Doe","birthdate":{"month":"OCTOBER","year":1952,"dayOfYear":284,"dayOfMonth":10,"dayOfWeek":"FRIDAY","hour":10,"minute":10,"monthValue":10,"nano":0,"second":10,"chronology":{"id":"ISO","calendarType":"iso8601"}}}"; line: 1, column: 60] (through reference chain: com.example.demo.model.Customer["birthdate"])
    at com.fasterxml.jackson.databind.exc.InvalidDefinitionException.from(InvalidDefinitionException.java:67) ~[jackson-databind-2.10.3.jar:2.10.3]
    at com.fasterxml.jackson.databind.DeserializationContext.reportBadDefinition(DeserializationContext.java:1592) ~[jackson-databind-2.10.3.jar:2.10.3]
    at com.fasterxml.jackson.databind.DeserializationContext.handleMissingInstantiator(DeserializationContext.java:1058) ~[jackson-databind-2.10.3.jar:2.10.3]
    at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromObjectUsingNonDefault(BeanDeserializerBase.java:1297) ~[jackson-databind-2.10.3.jar:2.10.3]
    at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserializeFromObject(BeanDeserializer.java:326) ~[jackson-databind-2.10.3.jar:2.10.3]
    at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:159) ~[jackson-databind-2.10.3.jar:2.10.3]
    at com.fasterxml.jackson.databind.deser.impl.MethodProperty.deserializeAndSet(MethodProperty.java:129) ~[jackson-databind-2.10.3.jar:2.10.3]
    at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserializeFromObject(BeanDeserializer.java:369) ~[jackson-databind-2.10.3.jar:2.10.3]
    at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:159) ~[jackson-databind-2.10.3.jar:2.10.3]
    at com.fasterxml.jackson.databind.ObjectReader._bindAndClose(ObjectReader.java:1719) ~[jackson-databind-2.10.3.jar:2.10.3]
    at com.fasterxml.jackson.databind.ObjectReader.readValue(ObjectReader.java:1282) ~[jackson-databind-2.10.3.jar:2.10.3]
    at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:438) ~[spring-kafka-2.3.7.RELEASE.jar:2.3.7.RELEASE]
    at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1268) ~[kafka-clients-2.3.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher.access$3600(Fetcher.java:124) ~[kafka-clients-2.3.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1492) ~[kafka-clients-2.3.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1600(Fetcher.java:1332) ~[kafka-clients-2.3.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:645) ~[kafka-clients-2.3.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:606) ~[kafka-clients-2.3.1.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1294) ~[kafka-clients-2.3.1.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1225) ~[kafka-clients-2.3.1.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1201) ~[kafka-clients-2.3.1.jar:na]
    at org.springframework.batch.item.kafka.KafkaItemReader.read(KafkaItemReader.java:164) ~[spring-batch-infrastructure-4.2.1.RELEASE.jar:4.2.1.RELEASE]
    at org.springframework.batch.core.step.item.SimpleChunkProvider.doRead(SimpleChunkProvider.java:99) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
    at org.springframework.batch.core.step.item.SimpleChunkProvider.read(SimpleChunkProvider.java:180) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
    at org.springframework.batch.core.step.item.SimpleChunkProvider$1.doInIteration(SimpleChunkProvider.java:126) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
    at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:375) ~[spring-batch-infrastructure-4.2.1.RELEASE.jar:4.2.1.RELEASE]
    at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:215) ~[spring-batch-infrastructure-4.2.1.RELEASE.jar:4.2.1.RELEASE]
    at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:145) ~[spring-batch-infrastructure-4.2.1.RELEASE.jar:4.2.1.RELEASE]
    at org.springframework.batch.core.step.item.SimpleChunkProvider.provide(SimpleChunkProvider.java:118) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
    at org.springframework.batch.core.step.item.ChunkOrientedTasklet.execute(ChunkOrientedTasklet.java:71) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
    at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:407) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
    at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:331) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
    at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:140) ~[spring-tx-5.2.5.RELEASE.jar:5.2.5.RELEASE]
    at org.springframework.batch.core.step.tasklet.TaskletStep$2.doInChunkContext(TaskletStep.java:273) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
    at org.springframework.batch.core.scope.context.StepContextRepeatCallback.doInIteration(StepContextRepeatCallback.java:82) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
    at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:375) ~[spring-batch-infrastructure-4.2.1.RELEASE.jar:4.2.1.RELEASE]
    at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:215) ~[spring-batch-infrastructure-4.2.1.RELEASE.jar:4.2.1.RELEASE]
    at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:145) ~[spring-batch-infrastructure-4.2.1.RELEASE.jar:4.2.1.RELEASE]
    at org.springframework.batch.core.step.tasklet.TaskletStep.doExecute(TaskletStep.java:258) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
    at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:208) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
    at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:148) [spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
    at org.springframework.batch.core.job.AbstractJob.handleStep(AbstractJob.java:410) [spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
    at org.springframework.batch.core.job.SimpleJob.doExecute(SimpleJob.java:136) [spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
    at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:319) [spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
    at org.springframework.batch.core.launch.support.SimpleJobLauncher$1.run(SimpleJobLauncher.java:147) [spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
    at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50) [spring-core-5.2.5.RELEASE.jar:5.2.5.RELEASE]
    at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:140) [spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_151]
    at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) ~[na:1.8.0_151]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) ~[na:1.8.0_151]
    at java.lang.reflect.Method.invoke(Unknown Source) ~[na:1.8.0_151]
    at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344) [spring-aop-5.2.5.RELEASE.jar:5.2.5.RELEASE]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198) [spring-aop-5.2.5.RELEASE.jar:5.2.5.RELEASE]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) [spring-aop-5.2.5.RELEASE.jar:5.2.5.RELEASE]
    at org.springframework.batch.core.configuration.annotation.SimpleBatchConfiguration$PassthruAdvice.invoke(SimpleBatchConfiguration.java:127) [spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) [spring-aop-5.2.5.RELEASE.jar:5.2.5.RELEASE]
    at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:212) [spring-aop-5.2.5.RELEASE.jar:5.2.5.RELEASE]
    at com.sun.proxy.$Proxy63.run(Unknown Source) [na:na]
    at com.example.demo.SpringBatchKafkaReaderApplication.run(SpringBatchKafkaReaderApplication.java:37) [classes/:na]
    at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:784) [spring-boot-2.2.6.RELEASE.jar:2.2.6.RELEASE]
    at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:768) [spring-boot-2.2.6.RELEASE.jar:2.2.6.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:322) [spring-boot-2.2.6.RELEASE.jar:2.2.6.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1226) [spring-boot-2.2.6.RELEASE.jar:2.2.6.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1215) [spring-boot-2.2.6.RELEASE.jar:2.2.6.RELEASE]
    at com.example.demo.SpringBatchKafkaReaderApplication.main(SpringBatchKafkaReaderApplication.java:27) [classes/:na]

CustomerRowMapper. java

public class CustomerRowMapper implements RowMapper<Customer> {
    private static final DateTimeFormatter DT_FORMAT = DateTimeFormatter.ofPattern("dd-MM-yyyy HH:mm:ss");

    @Override
    public Customer mapRow(ResultSet rs, int rowNum) throws SQLException {
        return Customer.builder()
                .id(rs.getLong("id"))
                .firstName(rs.getString("firstName"))
                .lastName(rs.getString("lastName"))
                .birthdate(LocalDateTime.parse(rs.getString("birthdate"), DT_FORMAT))
                .build();
    }
}

Customer. java

@Data
@AllArgsConstructor
@Builder
@NoArgsConstructor
public class Customer {
    private Long id;
    private String firstName;
    private String lastName;
    private LocalDateTime birthdate;
}

CustomerLineAggregator. java

public class CustomerLineAggregator implements LineAggregator<Customer>{
    private ObjectMapper objectMapper = new ObjectMapper();

    @Override
    public String aggregate(Customer item) {
        try {
            return objectMapper.writeValueAsString(item);
        } catch (Exception e) {
            throw new RuntimeException("Unable to Serialized Customer", e);
        }
    }
}

JobConfiguration. java

@Configuration
public class JobConfiguration {
    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Autowired
    private KafkaProperties properties;

    @Bean
    public KafkaItemReader<Long, Customer> kafkaItemReader() {
        Properties props = new Properties();
        props.putAll(this.properties.buildConsumerProperties());

        return new KafkaItemReaderBuilder<Long, Customer>()
            .partitions(0)
            .consumerProperties(props)
            .name("customers-reader")
            .saveState(true)
            .topic("customers")
            .build();
    }


    @Bean
    public FlatFileItemWriter<Customer> customerItemWriter() throws Exception{
        String customerOutputPath = File.createTempFile("customerOutput", ".out").getAbsolutePath();
        System.out.println(">> Output Path = "+customerOutputPath);

        FlatFileItemWriter<Customer> itemWriter = new FlatFileItemWriter<>();
        //A LineAggregator implementation that simply calls Object.toString() on the given object
        //itemWriter.setLineAggregator(new PassThroughLineAggregator<>());

        //Alternate ways
        itemWriter.setLineAggregator(new CustomerLineAggregator());

        itemWriter.setResource(new FileSystemResource(customerOutputPath));
        itemWriter.afterPropertiesSet();

        return itemWriter;
    }


    @Bean
    public Step step1() throws Exception {
        return stepBuilderFactory.get("step1")
                .<Customer, Customer>chunk(100)
                .reader(kafkaItemReader())
                .writer(customerItemWriter())
                .build();
    }

    @Bean
    public Job job() throws Exception {
        return jobBuilderFactory.get("job").incrementer(new RunIdIncrementer())
                .start(step1())
                .build();
    }
}

application.properties

##
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.LongDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.group-id=customers-group
##
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.LongSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.client-id=customers-client
##
spring.kafka.consumer.properties.spring.json.trusted.packages=*
##
spring.kafka.template.default-topic=customers

spring.batch.job.enabled=false

Данные, хранящиеся в Кафке, вот так, что мне нужно делать?

enter image description here

1 Ответ

0 голосов
/ 19 апреля 2020

Сериализатору необходим пользовательский ObjectMapper с сериализатором LocalDateTime.

Для этого вам необходимо создать сериализатор как объект, а не через свойства.

Связыватель Кафки в настоящее время не поддерживает это.

Чтобы обойти это; создайте свой собственный сериализатор, который переносит или подклассы JsonSerializer, для которого ObjectMapper настроен с модулем JavaTime - см. https://github.com/FasterXML/jackson-modules-java8

Это автоматически обрабатывается в spring-kafka 2.3 и более поздние версии .

at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:438) ~[spring-kafka-2.3.7.RELEASE.jar:2.3.7.RELEASE]

Поскольку вы уже используете Spring Kafka 2.3, вам просто нужно добавить jar модуля в путь к классам.

...