Spring-cloud-stream MessageConversionException - PullRequest
       45

Spring-cloud-stream MessageConversionException

0 голосов
/ 27 сентября 2019

У меня есть @StreamListener, который принимает строку @Payload.Для тестирования этого класса Listener я написал класс Junit, используя встроенный Kafka.Я получаю приведенную ниже ошибку при запуске моего тестового класса

Ошибка

ОШИБКА osihandler.LoggingHandler - org.springframework.messaging.converter.MessageConversionException: Невозможно enter code here преобразовать из [[B] в[java.lang.String] для GenericMessage


Если я изменю dataType @Payload с String на byte [], сообщение будет выбрано моим классом слушателя.

Может кто-нибудь помочь мне узнать, в чем здесь проблема?Я думаю, что это что-то с конфигурацией облачного потока.


@ExtendWith(SpringExtension.class)
@DirtiesContext
@SpringBootTest(classes = IntegrationTestStoreInventoryConsumer.class)
@EmbeddedKafka(partitions = 1, controlledShutdown = true,
        topics = {
                "process_clear_faulty_item_by_other_business_process_uq_jp_0.0.1_dev",
                "process_clear_faulty_item_by_other_business_process_dlq_uq_jp_0.0.1_dev"})
public class ProcessClearTransactionDataListenerTest {

    public static final String KEY_SERIALIZER = "key.serializer";
    public static final String VALUE_SERIALIZER = "value.serializer";

    @Autowired
    private EmbeddedKafkaBroker embeddedKafka;

    @BeforeEach
    public void setup() {
        System.setProperty("spring.cloud.stream.kafka.binder.brokers", embeddedKafka.getBrokersAsString());
    }

    @Test
    public void someTest() throws Exception {
        Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
        senderProps.put(KEY_SERIALIZER, StringSerializer.class);
        senderProps.put(VALUE_SERIALIZER, StringSerializer.class);
        DefaultKafkaProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<>(senderProps);
        KafkaTemplate<String, String> template = new KafkaTemplate<>(producerFactory, true);
        template.setDefaultTopic("process_clear_faulty_item_by_other_business_process_uq_jp_0.0.1_dev");
        template.sendDefault("foo");

        Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(
                "process_clear_faulty_item_by_other_business_process_group_dlq_uq_jp_0.0.1_dev",
                "false",
                this.embeddedKafka);
        consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        consumerProps.put("key.deserializer", StringDeserializer.class);
        consumerProps.put("value.deserializer", StringDeserializer.class);
        DefaultKafkaConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);

        Consumer<String, String> consumer = cf.createConsumer();
        consumer.subscribe(Collections.singleton("process_clear_faulty_item_by_other_business_process_dlq_uq_jp_0.0.1_dev"));
        ConsumerRecords<String, String> records = consumer.poll(10_000);
        consumer.commitSync();
        Assertions.assertThat(records.count()).isGreaterThanOrEqualTo(1);

#### Вот мое приложение. Yaml выглядит так.

spring:
  cloud:
    stream:
      kafka:
        streams:
          binder:
            configuration:
              default:
                key:
                  serde: org.apache.kafka.common.serialization.Serdes$StringSerde
                value:
                  serde: org.apache.kafka.common.serialization.Serdes$StringSerde
        bindings:
          process-clear-faulty-item-by-other-business-process-in:
            consumer:
              enable-dlq: true
              dlq-name: process_clear_faulty_item_by_other_business_process_dlq_uq_jp_0.0.1_dev
              dlq-producer-properties:
                retries: 1
        binder:
          brokers: ${spring.embedded.kafka.brokers}
          replicationFactor: ${replication_factor:1}
          autoCreateTopics: true
          autoAddPartitions: true
          configuration:
            retries: 1
            batch.size: 16384
            linger.ms: 1
            enable.idempotence: true
            buffer.memory: 33554432
            request.timeout.ms: 3000
            transaction.timeout.ms: 3000
            max.block.ms: ${kafka_max_block_time:5000}
            max.poll.records: 80
            poll.timeout: 10000
            commit.retries: 1
            commit.retry.interval: 1000
            session.timeout.ms.config: 50000
            shutdown.signal: INT,TERM
            acks: "all"
      bindings:
        process-clear-faulty-item-by-other-business-process-out:
          destination: process_clear_faulty_item_by_other_business_process_uq_jp_0.0.1_dev
          contentType: application/json
          producer:
            partitionCount: ${partition_count:1}
        process-clear-faulty-item-by-other-business-process-in:
          destination: process_clear_faulty_item_by_other_business_process_uq_jp_0.0.1_dev
          contentType: application/json
          partitioned: true
          group: process_clear_faulty_item_by_other_business_process_group_uq_jp_0.0.1_dev

...