Как расширить трансформатор в Кафка Скала? - PullRequest
0 голосов
/ 24 октября 2019

Я работаю над реализацией потокового счетчика Kafka счетчика слов в Scala, в котором я расширил преобразователь:

class WordCounter extends Transformer[String, String, (String, Long)]

Затем он вызывается в потоке следующим образом:

val counter: KStream[String, Long] = filtered_record.transform(new WordCounter, "count")

Тем не менее, я получаю сообщение об ошибке ниже при запуске моей программы через sbt:

[error]  required: org.apache.kafka.streams.kstream.TransformerSupplier[String,String,org.apache.kafka.streams.KeyValue[String,Long]]

Я не могу понять, как это исправить, и не мог найти подходящий Kafka пример подобногореализация. Кто-нибудь понял, что я делаю неправильно?

1 Ответ

3 голосов
/ 24 октября 2019

Подпись transform():

  def transform[K1, V1](transformerSupplier: TransformerSupplier[K, V, KeyValue[K1, V1]],
                        stateStoreNames: String*): KStream[K1, V1]

Таким образом, transform() принимает TransformerSupplier в качестве первого аргумента, а не Transformer.

См. Также Javadocs

...