Любые идеи будут оценены, я пытаюсь написать тест как;
@ExtendWith(SpringExtension.class)
@EmbeddedKafka(count = 1, controlledShutdown = true, topics = { "input", "output" }, brokerProperties = { "broker.id=2",
"listeners=PLAINTEXT://127.0.0.1:9092" })
class BindingTest {
@Autowired
private ApplicationContext applicationContext;
@Autowired
private EmbeddedKafkaBroker embeddedKafka;
@Autowired
private CustomBindings cBindings;
/**
* @throws java.lang.Exception
*/
@BeforeEach
void setUp() throws Exception {
}
/**
* @throws java.lang.Exception
*/
@AfterEach
void tearDown() throws Exception {
embeddedKafka.getKafkaServers().forEach(b -> b.shutdown());
}
@Test
void test0() {
String KEY = "KEY";
String testMessage = "TESTMESSAGE";
Message<String> message = MessageBuilder.withPayload(testMessage)
.setHeader(KafkaHeaders.MESSAGE_KEY, KEY).build();
cBindings.output().send(message);
}
@SpringBootApplication
@EnableBinding(CustomBindings.class)
public static class BindingApplication {
}
}
и
spring.cloud.stream.bindings.output.destination = выход
spring.cloud.stream.bindings.output.contentType = применение / JSON
spring.cloud.stream.bindings.output.producer.header режим = сырец
spring.cloud.stream.bindings.output.producer.use родное кодирование = верно
spring.cloud.stream.kafka.streams.bindings.output.producer.keySerde == org.apache.kafka.common.serialization.StringSerializer
spring.cloud.stream.kafka.streams.bindings.output.producer.valueSerde == org.apache.kafka.common.serialization.StringSerializer
Все еще получает
ошибка произошла в обработчике сообщений [org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$ProducerConfigurationMessageHandler@71f0806b]; вложенным исключением является org.apache.kafka.common.errors.SerializationException: невозможно преобразовать значение класса [B в класс org.apache.kafka.common.serialization.StringSerializer, указанный в value.serializer, failedMessage = GenericMessage [payload = byte [8], заголовки = {id = c91897bc-2e4e-0a74-bc05-17fb31b690f6, kafka_messageKey = KEY, contentType = application / json, отметка времени = 1542295539746}]
Что не имеет смысла для меня