Задать временную метку в выходных данных с помощью Kafka Streams не удалось для преобразований - PullRequest
0 голосов
/ 06 ноября 2018

Предположим, у нас есть трансформатор (написанный на Scala)

new Transformer[String, V, (String, V)]() {
  var context: ProcessorContext = _

  override def init(context: ProcessorContext): Unit = {
    this.context = context
  }

  override def transform(key: String, value: V): (String, V) = {
    val timestamp = toTimestamp(value)
    context.forward(key, value, To.all().withTimestamp(timestamp))
    key -> value
  }

  override def close(): Unit = ()
}

, где toTimestamp - это просто функция, которая возвращает отметку времени, извлеченную из значения записи. Как только это выполнено, есть NPE:

Exception in thread "...-6f3693b9-4e8d-4e65-9af6-928884320351-StreamThread-5" java.lang.NullPointerException
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:110)
    at CustomTransformer.transform()
    at CustomTransformer.transform()
    at org.apache.kafka.streams.scala.kstream.KStream$$anon$1$$anon$2.transform(KStream.scala:302)
    at org.apache.kafka.streams.scala.kstream.KStream$$anon$1$$anon$2.transform(KStream.scala:300)
    at 

что по существу происходит, так это то, что ProcessorContextImpl терпит неудачу в:

public <K, V> void forward(final K key, final V value, final To to) {
    toInternal.update(to);
    if (toInternal.hasTimestamp()) {
        recordContext.setTimestamp(toInternal.timestamp());
    }
    final ProcessorNode previousNode = currentNode();

потому что recordContext не был инициализирован (и это могло быть сделано только внутренне KafkaStreams).

Это дополнительный вопрос Установка метки времени при выводе с Kafka Streams 1

Ответы [ 2 ]

0 голосов
/ 07 ноября 2018

@ matthias-j-sax Такое же поведение, если процессор повторно используется в коде Java.

    Topology topology = new Topology();
    MyProcessor myProcessor = new MyProcessor();
    topology.addSource("source", "topic-1")
            .addProcessor(
                    "processor",
                    () -> {
                        return myProcessor;
                    },
                    "source"
            )
            .addSink("sink", "topic-2", "processor");
    KafkaStreams streams = new KafkaStreams(topology, config);
    streams.start();
0 голосов
/ 06 ноября 2018

Если вы работаете с transformer, вам нужно убедиться, что новый объект Transformer создается при вызове TransformerSupplier#get(). (ср. https://docs.confluent.io/current/streams/faq.html#why-do-i-get-an-illegalstateexception-when-accessing-record-metadata)

В первоначальном вопросе я думал, что это ваша переменная context, которая приводит к NPE, но теперь я понял, что это внутренняя структура Kafka Streams.

В Scala API есть ошибка в 2.0.0, которая может привести к тому, что один и тот же экземпляр Transformer будет повторно использован (https://issues.apache.org/jira/browse/KAFKA-7250). I думает , что вы нажали эту ошибку. Немного переписав ваш код, вы исправите ошибки. Обратите внимание, что Kafka 2.0.1 и Kafka 2.1.0 содержат исправление.

...