Я унаследовал этот код и не могу понять, почему он не работает.Это должно работать, но, очевидно, это не так.Он не компилируется.
Я искал код в GitHub , но в scala нет примеров, только java.
Моя среда osx 10.14.4, запуск zookeeper и kafka в локальном режиме:
zkServer print-cmd
ZooKeeper JMX enabled by default
Using config: /usr/local/etc/zookeeper/zoo.cfg
"java" -Dzookeeper.log.dir="." -Dzookeeper.root.logger="INFO,CONSOLE" -cp "/usr/local/Cellar/zookeeper/3.4.13/libexec/bin/../build/classes:/usr/local/Cellar/zookeeper/3.4.13/libexec/bin/../build/lib/*.jar:/usr/local/Cellar/zookeeper/3.4.13/libexec/bin/../lib/slf4j-log4j12-1.7.25.jar:/usr/local/Cellar/zookeeper/3.4.13/libexec/bin/../lib/slf4j-api-1.7.25.jar:/usr/local/Cellar/zookeeper/3.4.13/libexec/bin/../lib/netty-3.10.6.Final.jar:/usr/local/Cellar/zookeeper/3.4.13/libexec/bin/../lib/log4j-1.2.17.jar:/usr/local/Cellar/zookeeper/3.4.13/libexec/bin/../lib/jline-0.9.94.jar:/usr/local/Cellar/zookeeper/3.4.13/libexec/bin/../lib/audience-annotations-0.5.0.jar:/usr/local/Cellar/zookeeper/3.4.13/libexec/bin/../zookeeper-3.4.13.jar:/usr/local/Cellar/zookeeper/3.4.13/libexec/bin/../src/java/lib/*.jar:/usr/local/etc/zookeeper:" -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.local.only=false org.apache.zookeeper.server.quorum.QuorumPeerMain "/usr/local/etc/zookeeper/zoo.cfg" > "./zookeeper.out" 2>&1 < /dev/null
Я использую kafka 2.2.0
Вот что у меня есть:
build.sbt
name := "Chapter3"
version := "0.0.1-SNAPSHOT"
scalaVersion := "2.12.8"
resolvers ++= Seq(
"Typesafe Releases" at "http://repo.typesafe.com/typesafe/releases/",
"apache snapshots" at "http://repository.apache.org/snapshots/",
"confluent.io" at "http://packages.confluent.io/maven/"
)
val kafka_streams_scala_version = "0.1.0"
libraryDependencies ++= Seq("com.lightbend" %% "kafka-streams-scala" % kafka_streams_scala_version)
Это тест:
import java.lang.Long **// UNUSED IMPORT**
import java.util.Properties
import java.util.concurrent.TimeUnit
import org.apache.kafka.streams.kstream.{KGroupedStream, KStream, KTable, Materialized} **// UNUSED IMPORT, KGroupedStream and Materialized**
import org.apache.kafka.streams.{KafkaStreams, StreamsBuilder, StreamsConfig}
import org.apache.kafka.common.serialization.Serdes
import Serdes._ **// UNUSED IMPORT**
import scala.language.implicitConversions
import scala.util.Try // UNUSED IMPORT
case class PharmaClass(val country: String,
val year: String,
val pcnt_health_expend: String,
val pcnt_gdp: String,
val usd_capital_expend: String,
val flag_codes: String,
val total_expend: String)
object KafkaStreams {
def main(args: Array[String]): Unit = {
val config: Properties = {
val p = new Properties()
p.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application")
p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
p.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass)
p.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass)
p
}
//implicit val pharmaClassSerde: Serde[PharmaClass] = new AvroSerde
val builder: StreamsBuilder = new StreamsBuilder
val textLines: KStream[String, String] = builder.stream[String, String]("pharma")
val eachLinesAsArray : KStream[String, Array[String]] = textLines
.mapValues(x => x.split(","))**// missing parameter type: x** [error].mapValues(x => x.split(",")
^
val mapToPharmaClass: KStream[String, PharmaClass] = eachLinesAsArray.
mapValues { x => // // missing parameter type
^
{
PharmaClass(
country = x(0),
year = x(1),
pcnt_health_expend = x(2),
pcnt_gdp = x(3),
usd_capital_expend = x(4),
flag_codes = x(5),
total_expend = x(6)
)
}
}
/*sum of expenditure on health per per country*/
**// missing parameter type on y variable**
val pcnt_health_ex : KTable[String, Double] = mapToPharmaClass.mapValues( y => (y.country,y.pcnt_health_expend.toDouble))
.groupByKey()
.reduce(_+_)
pcnt_health_ex.toStream.to("pcnt_health_expend")
/*sum of pcnt_gdp in year 2015 per country*/
**// missing parameter type: _**
val pcnt_gdp : KTable[String, Double] = mapToPharmaClass
.filter((_,pharmaclass) => pharmaclass.year == 2015)
.mapValues( y => (y.country,y.pcnt_gdp.toDouble))
.groupByKey()
.reduce(_+_)
pcnt_health_ex.toStream.to("pcnt_gdp")
val streams: KafkaStreams = new KafkaStreams(builder.build(), config)
streams.start()
sys.ShutdownHookThread {
streams.close(10, TimeUnit.SECONDS)
}
}
}
Может ли кто-нибудь помочь мне, дав мне подсказку или указав, как решить эту проблему?
Спасибо.