Как изменить ключ и значения KStream в программе подсчета слов Kafka? - PullRequest
1 голос
/ 21 марта 2020

Я новичок в Kafka Streams и вроде застрял в базовой c программе подсчета слов. В приведенной ниже программе я пытаюсь изменить регистр значения, но он не работает (val wordCountInputProcessed = wordCountInput.mapValues(value => value.toLowerCase)). Здесь что-то не так?

kafka stream version => 2.3.0

Scala version => 2.11.8

import java.util._
import org.apache.kafka.streams.{KafkaStreams, StreamsConfig}
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.streams.{KafkaStreams,StreamsBuilder, StreamsConfig}
import org.apache.kafka.common.serialization.{StringDeserializer,LongDeserializer}

object WordCount {
  def main(args: Array[String]): Unit = {

    val config = new Properties()

    config.put(StreamsConfig.APPLICATION_ID_CONFIG,"word-count-example")
    config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092")
    config.put(ConsumerConfig.AutoOffsetReset,"earliest")
    config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,classOf[StringDeserializer])
    config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,classOf[StringDeserializer])



    val builder = new StreamsBuilder
    val wordCountInput = builder.stream[String,String]("streams-plaintext-input")

    val wordCountInputProcessed = wordCountInput.mapValues(value => value.toLowerCase)

    wordCountInputProcessed.to("streams-plaintext-output")

    val streams = new KafkaStreams(builder.build(),config)
    streams.start()
    println(streams.toString)

  }
}

Вот снимок этого выпуск.

enter image description here

Разве это не строка вместо Ничего ?

enter image description here

Ответы [ 2 ]

3 голосов
/ 21 марта 2020

Вы должны переназначить преобразованный KStream на переменную KStream wordCountInput, в противном случае wordCountInput все еще получил первоначальный KStream, что-то вроде этого:

wordCountInput = wordCountInput.mapValues(value => value.toLowerCase)

Обновлено

Я делаю некоторые другие изменения, и приложение работает нормально.

  1. Потоки Кафки, использующие класс SerDes для обтекания StringSerializer/StringDeserializer, so change SERDES class config from StringSerializer / StringDeserializer to SerdeString` :
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,classOf[StringSerde])
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,classOf[StringSerde])
Дополнительный совет: будет проще, если вы добавите отладку в Stream DSL, чтобы проверить, получаете ли вы новое сообщение или нет, я обычно отлаживаю так:
val wordCountInputProcessed = wordCountInput
      .mapValues(value => {
        println("origin " + value)
        println("lowercase " + value.toLowerCase)
        value.toLowerCase
      })

Вы также можете положить отладку внутрь mapValues.

Обновление 1

Обновление полной версии приложения


import java.util.Properties

import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.Serdes.StringSerde
import org.apache.kafka.streams.{KafkaStreams, StreamsBuilder, StreamsConfig}

object WordCount {
  def main(args: Array[String]): Unit = {

    val config = new Properties

    config.put(StreamsConfig.APPLICATION_ID_CONFIG,"word-count-example")
    config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092")
    config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest")
    config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,classOf[StringSerde])
    config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,classOf[StringSerde])



    val builder = new StreamsBuilder
    val wordCountInput = builder.stream[String,String]("streams-plaintext-input")

    val wordCountInputProcessed = wordCountInput
      .mapValues(value => {
        println("origin " + value)
        value.toLowerCase
      })

    wordCountInputProcessed.mapValues(value => {
      println("lowercase " + value)
      value
    })

    wordCountInputProcessed.to("streams-plaintext-output")

    val streams = new KafkaStreams(builder.build(),config)
    streams.start()
    println(streams.toString)

  }
}
2 голосов
/ 22 марта 2020

Я перешел на DSL потоков Кафки для scala API с Java, и это решило проблему. Я также использую следующие модули по соответствующим причинам.

org.apache.kafka.streams.scala.ImplicitConversions: Модуль, который включает в себя неявные преобразования между классами Scala и Java.

org.apache.kafka.streams.scala.Serdes: Модуль который содержит все примитивные SerD, которые могут быть импортированы как имплициты, и помощник для создания пользовательских SerD.

Пожалуйста, обратитесь к этой документации для получения более подробной информации (Topi c: KAFKA STREAMS DSL FOR SCALA) => https://kafka.apache.org/20/documentation/streams/developer-guide/dsl-api.html#scala -dsl

import java.time.Duration
import java.util._
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.{KafkaStreams, StreamsConfig}
import org.apache.kafka.streams.scala.StreamsBuilder

// Import for Scala DSL
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala.Serdes._

object WordCount {

  def main(args: Array[String]): Unit = {

    val config = new Properties()

    config.put(StreamsConfig.APPLICATION_ID_CONFIG,"word-count-example")
    config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092")
    config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest")
    config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,classOf[Serdes.StringSerde])
    config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,classOf[Serdes.LongSerde])

    val builder = new StreamsBuilder
    val wordCountInput = builder.stream[String,String]("streams-plaintext-input")

    val wordCountInputProcessed = wordCountInput.mapValues(value => value.toLowerCase())
        .flatMapValues(x=>x.split(" "))
        .selectKey((key,value) => value)
        .groupByKey
        .count

    wordCountInputProcessed.toStream.to("streams-plaintext-output")

    val streams = new KafkaStreams(builder.build(),config)
    streams.start()
    println(streams.toString)

    sys.ShutdownHookThread {
      streams.close(Duration.ofSeconds(10))
    }

  }
}
...