У меня есть @StreamListener, который принимает строку @Payload.Для тестирования этого класса Listener я написал класс Junit, используя встроенный Kafka.Я получаю приведенную ниже ошибку при запуске моего тестового класса
Ошибка
ОШИБКА osihandler.LoggingHandler - org.springframework.messaging.converter.MessageConversionException: Невозможно enter code here
преобразовать из [[B] в[java.lang.String] для GenericMessage
Если я изменю dataType @Payload с String на byte [], сообщение будет выбрано моим классом слушателя.
Может кто-нибудь помочь мне узнать, в чем здесь проблема?Я думаю, что это что-то с конфигурацией облачного потока.
@ExtendWith(SpringExtension.class)
@DirtiesContext
@SpringBootTest(classes = IntegrationTestStoreInventoryConsumer.class)
@EmbeddedKafka(partitions = 1, controlledShutdown = true,
topics = {
"process_clear_faulty_item_by_other_business_process_uq_jp_0.0.1_dev",
"process_clear_faulty_item_by_other_business_process_dlq_uq_jp_0.0.1_dev"})
public class ProcessClearTransactionDataListenerTest {
public static final String KEY_SERIALIZER = "key.serializer";
public static final String VALUE_SERIALIZER = "value.serializer";
@Autowired
private EmbeddedKafkaBroker embeddedKafka;
@BeforeEach
public void setup() {
System.setProperty("spring.cloud.stream.kafka.binder.brokers", embeddedKafka.getBrokersAsString());
}
@Test
public void someTest() throws Exception {
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
senderProps.put(KEY_SERIALIZER, StringSerializer.class);
senderProps.put(VALUE_SERIALIZER, StringSerializer.class);
DefaultKafkaProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<>(senderProps);
KafkaTemplate<String, String> template = new KafkaTemplate<>(producerFactory, true);
template.setDefaultTopic("process_clear_faulty_item_by_other_business_process_uq_jp_0.0.1_dev");
template.sendDefault("foo");
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(
"process_clear_faulty_item_by_other_business_process_group_dlq_uq_jp_0.0.1_dev",
"false",
this.embeddedKafka);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProps.put("key.deserializer", StringDeserializer.class);
consumerProps.put("value.deserializer", StringDeserializer.class);
DefaultKafkaConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
Consumer<String, String> consumer = cf.createConsumer();
consumer.subscribe(Collections.singleton("process_clear_faulty_item_by_other_business_process_dlq_uq_jp_0.0.1_dev"));
ConsumerRecords<String, String> records = consumer.poll(10_000);
consumer.commitSync();
Assertions.assertThat(records.count()).isGreaterThanOrEqualTo(1);
#### Вот мое приложение. Yaml выглядит так.
spring:
cloud:
stream:
kafka:
streams:
binder:
configuration:
default:
key:
serde: org.apache.kafka.common.serialization.Serdes$StringSerde
value:
serde: org.apache.kafka.common.serialization.Serdes$StringSerde
bindings:
process-clear-faulty-item-by-other-business-process-in:
consumer:
enable-dlq: true
dlq-name: process_clear_faulty_item_by_other_business_process_dlq_uq_jp_0.0.1_dev
dlq-producer-properties:
retries: 1
binder:
brokers: ${spring.embedded.kafka.brokers}
replicationFactor: ${replication_factor:1}
autoCreateTopics: true
autoAddPartitions: true
configuration:
retries: 1
batch.size: 16384
linger.ms: 1
enable.idempotence: true
buffer.memory: 33554432
request.timeout.ms: 3000
transaction.timeout.ms: 3000
max.block.ms: ${kafka_max_block_time:5000}
max.poll.records: 80
poll.timeout: 10000
commit.retries: 1
commit.retry.interval: 1000
session.timeout.ms.config: 50000
shutdown.signal: INT,TERM
acks: "all"
bindings:
process-clear-faulty-item-by-other-business-process-out:
destination: process_clear_faulty_item_by_other_business_process_uq_jp_0.0.1_dev
contentType: application/json
producer:
partitionCount: ${partition_count:1}
process-clear-faulty-item-by-other-business-process-in:
destination: process_clear_faulty_item_by_other_business_process_uq_jp_0.0.1_dev
contentType: application/json
partitioned: true
group: process_clear_faulty_item_by_other_business_process_group_uq_jp_0.0.1_dev