Вы должны переназначить преобразованный KStream на переменную KStream wordCountInput
, в противном случае wordCountInput
все еще получил первоначальный KStream, что-то вроде этого:
wordCountInput = wordCountInput.mapValues(value => value.toLowerCase)
Обновлено
Я делаю некоторые другие изменения, и приложение работает нормально.
- Потоки Кафки, использующие класс
SerDes
для обтекания StringSerializer/StringDeserializer, so change SERDES class config from
StringSerializer / StringDeserializer to
SerdeString` :
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,classOf[StringSerde])
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,classOf[StringSerde])
Дополнительный совет: будет проще, если вы добавите отладку в Stream DSL, чтобы проверить, получаете ли вы новое сообщение или нет, я обычно отлаживаю так:
val wordCountInputProcessed = wordCountInput
.mapValues(value => {
println("origin " + value)
println("lowercase " + value.toLowerCase)
value.toLowerCase
})
Вы также можете положить отладку внутрь mapValues
.
Обновление 1
Обновление полной версии приложения
import java.util.Properties
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.Serdes.StringSerde
import org.apache.kafka.streams.{KafkaStreams, StreamsBuilder, StreamsConfig}
object WordCount {
def main(args: Array[String]): Unit = {
val config = new Properties
config.put(StreamsConfig.APPLICATION_ID_CONFIG,"word-count-example")
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092")
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest")
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,classOf[StringSerde])
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,classOf[StringSerde])
val builder = new StreamsBuilder
val wordCountInput = builder.stream[String,String]("streams-plaintext-input")
val wordCountInputProcessed = wordCountInput
.mapValues(value => {
println("origin " + value)
value.toLowerCase
})
wordCountInputProcessed.mapValues(value => {
println("lowercase " + value)
value
})
wordCountInputProcessed.to("streams-plaintext-output")
val streams = new KafkaStreams(builder.build(),config)
streams.start()
println(streams.toString)
}
}