Нулевое значение в KStream.map, вызывающее IllegalArgumentException: полезная нагрузка не должна быть нулевой - PullRequest
0 голосов
/ 21 октября 2019

Описание проблемы:

Я создаю весеннее облачное приложение Kafka stream. У меня есть одна входная тема и одна выходная тема, и я пытаюсь применить операцию преобразования KStream Key-Value для входной темы, используя функцию KStream.map
Проблема, если у меня есть преобразованное значение как null , функция выдает IllegalArgumentException
Мои вопросы:
1: причина исключения? Хотя в документации написано: «Ввод записей с нулевым ключом илинулевое значение игнорируется "
2: лучшие практики для обработки исключений в операциях без сохранения состояния или состояния ? Должна ли быть достаточно попытка / улов окружить весь план обработки? Или я должен иметь попробовать / поймать внутри каждой функции преобразования (например, фильтр, карта, объединить, уменьшить)?

Любые мысли приветствуются.

Конфигурация приложения:

spring:
  application:
    name:kafka-streams-test
  cloud.stream:
    kafka.streams:
      binder:
        brokers: localhost:9093
        configuration:
          commit.interval.ms: 1000
          security.protocol: SASL_PLAINTEXT
          sasl.mechanism: GSSAPI
          sasl.kerberos.service.name: kafka
        serdeError: logAndContinue
      bindings:
        streams-words-input:
          consumer:
            application-id: Input-Words
        streams-words-output:
          consumer:
            application-id: Output-Words
    bindings:
      streams-words-input:
        destination: streams-words-input
      streams-words-output:
        destination: streams-words-output

Пример кода:

@StreamListener()
@SendTo("streams-words-output")
public KStream<String, Long> createWords(
    @Input("streams-words-input") final KStream<String, String> wordsInput){
    return wordsInput
            .map((key,value) -> KeyValue.pair(key, null));
}

Stacktrace:

java.lang.IllegalArgumentException: Payload must not be null
at org.springframework.util.Assert.notNull(Assert.java:198)
at org.springframework.messaging.support.MessageBuilder.<init>(MessageBuilder.java:57)
at org.springframework.messaging.support.MessageBuilder.withPayload(MessageBuilder.java:179)
at org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsMessageConversionDelegate.lambda$serializeOnOutbound$0(KafkaStreamsMessageConversionDelegate.java:86)
at org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsMessageConversionDelegate$$Lambda$743/1725151361.apply(Unknown Source)
at org.apache.kafka.streams.kstream.internals.AbstractStream$2.apply(AbstractStream.java:87)
at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:40)
at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:50)
at org.apache.kafka.streams.processor.internals.ProcessorNode.runAndMeasureLatency(ProcessorNode.java:244)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:42)
at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:50)
at org.apache.kafka.streams.processor.internals.ProcessorNode.runAndMeasureLatency(ProcessorNode.java:244)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:50)
at org.apache.kafka.streams.processor.internals.ProcessorNode.runAndMeasureLatency(ProcessorNode.java:244)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
at org.apache.kafka.streams.kstream.internals.KStreamPassThrough$KStreamPassThroughProcessor.process(KStreamPassThrough.java:33)
at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:50)
at org.apache.kafka.streams.processor.internals.ProcessorNode.runAndMeasureLatency(ProcessorNode.java:244)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:122)
at org.apache.kafka.streams.kstream.internals.KStreamBranch$KStreamBranchProcessor.process(KStreamBranch.java:48)
at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:50)
at org.apache.kafka.streams.processor.internals.ProcessorNode.runAndMeasureLatency(ProcessorNode.java:244)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:129)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:50)
at org.apache.kafka.streams.processor.internals.ProcessorNode.runAndMeasureLatency(ProcessorNode.java:244)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:302)
at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:94)
at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:409)
at org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:964)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:832)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:767)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:736)

1 Ответ

1 голос
/ 21 октября 2019

Исключение, которое вы видите, исходит из давнего правила Spring Messaging - " нет сообщения с нулевой полезной нагрузкой ". Другими словами, нет сообщения для отправки, если нечего сообщать.

Тем не менее, очевидно, что есть проблема с KStream и как он обрабатывает это условие, поэтому я бы предложил поднять проблему в Кафка вяжущее . В то же время вы можете легко добавить операцию filter в свой конвейер, чтобы отфильтровать нули.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...