Связка весеннего облачного потока - PullRequest
0 голосов
/ 15 ноября 2018

Любые идеи будут оценены, я пытаюсь написать тест как;

@ExtendWith(SpringExtension.class)
@EmbeddedKafka(count = 1, controlledShutdown = true, topics = { "input", "output" }, brokerProperties = { "broker.id=2",
    "listeners=PLAINTEXT://127.0.0.1:9092" })
class BindingTest {

@Autowired
private ApplicationContext applicationContext;

@Autowired
private EmbeddedKafkaBroker embeddedKafka;

@Autowired
private CustomBindings cBindings;

/**
 * @throws java.lang.Exception
 */
@BeforeEach
void setUp() throws Exception {
}

/**
 * @throws java.lang.Exception
 */
@AfterEach
void tearDown() throws Exception {
    embeddedKafka.getKafkaServers().forEach(b -> b.shutdown());
}

@Test
void test0() {
    String KEY = "KEY";
    String testMessage = "TESTMESSAGE";
    Message<String> message = MessageBuilder.withPayload(testMessage)
            .setHeader(KafkaHeaders.MESSAGE_KEY, KEY).build();
    cBindings.output().send(message);

}

@SpringBootApplication
@EnableBinding(CustomBindings.class)
public static class BindingApplication {

}

}

и

spring.cloud.stream.bindings.output.destination = выход spring.cloud.stream.bindings.output.contentType = применение / JSON

spring.cloud.stream.bindings.output.producer.header режим = сырец spring.cloud.stream.bindings.output.producer.use родное кодирование = верно

spring.cloud.stream.kafka.streams.bindings.output.producer.keySerde == org.apache.kafka.common.serialization.StringSerializer spring.cloud.stream.kafka.streams.bindings.output.producer.valueSerde == org.apache.kafka.common.serialization.StringSerializer

Все еще получает

ошибка произошла в обработчике сообщений [org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$ProducerConfigurationMessageHandler@71f0806b]; вложенным исключением является org.apache.kafka.common.errors.SerializationException: невозможно преобразовать значение класса [B в класс org.apache.kafka.common.serialization.StringSerializer, указанный в value.serializer, failedMessage = GenericMessage [payload = byte [8], заголовки = {id = c91897bc-2e4e-0a74-bc05-17fb31b690f6, kafka_messageKey = KEY, contentType = application / json, отметка времени = 1542295539746}]

Что не имеет смысла для меня

1 Ответ

0 голосов
/ 15 ноября 2018

Судя по всему, это не приложение Kafka Streams, а обычное приложение Spring Cloud Stream с привязкой Kafka.Следовательно, вам не нужны эти два свойства. spring.cloud.stream.kafka.streams.bindings.output.producer.keySerde==org.apache.kafka.common.serialization.StringSerializer spring.cloud.stream.kafka.streams.bindings.output.producer.valueSerde==org.apache.kafka.common.serialization.StringSerializer

Кроме того, чтобы исправить ошибку, вам необходимо удалить эту строку из вашей конфигурации: spring.cloud.stream.bindings.output.producer.use-native-encoding=true.

Устанавливая значение true для собственной кодировки, вы просите Kafka выполнить сериализацию, которая будет опираться на значение по умолчанию ByteArraySerializer.Если вы действительно предполагали собственную сериализацию, вам необходимо установить соответствующее значение сериализатора (StringSerializer).Но так как это тест, я предлагаю вам удалить это свойство и посмотреть, прошел ли ваш тест.

...