Kafka Streams - переслать запись в init () - PullRequest
0 голосов
/ 15 марта 2019

В пользовательском Transformer мы пытаемся создать и переслать некоторые записи в методе init(..). Согласно JavaDoc Kafka Streams, это должно быть возможно. Однако это не работает, и мы получаем следующее исключение:

Exception in thread "my-app-0.0.16-3be1aa47-d51d-4b0c-821a-a0b850359490-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: failed to initialize processor KSTREAM-TRANSFORM-0000000004
at org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:95)
at org.apache.kafka.streams.processor.internals.StreamTask.initTopology(StreamTask.java:517)
at org.apache.kafka.streams.processor.internals.StreamTask.initializeTopology(StreamTask.java:267)
at org.apache.kafka.streams.processor.internals.AssignedTasks.transitionToRunning(AssignedTasks.java:253)
at org.apache.kafka.streams.processor.internals.AssignedTasks.initializeNewTasks(AssignedTasks.java:79)
at org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:318)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:839)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:777)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:747)
Caused by: java.lang.NullPointerException
at org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:43)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:115)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:146)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:129)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:93)

Я неправильно понимаю JavaDoc, и просто невозможно отправить записи из init?

1 Ответ

1 голос
/ 15 марта 2019

Я думаю, вы неправильно поняли API преобразования . Согласно определению, преобразуйте каждую запись входного потока в ноль или более записей в выходном потоке.

Внутри Transformer,состояние получается через интерфейс ProcessorContext.Processor имеет метод init. Метод init () передает экземпляр ProcessorContext, который обеспечивает доступ к метаданным текущей обработанной записи, включая ее исходную тему и раздел Kafka, соответствующее смещение сообщенияи далее такая информация.

Теперь у нас есть доступ к контексту с использованием метода init, затем мы можем вызвать метод forward из контекста, но не внутри метода init.

Этот пример можно увидеть, чтобы понять .Надеюсь, это сработает для вас.

...