Не могу сделать ни один пример компиляции, не говоря уже о работе.Что я делаю неправильно?Запущено из шаблона быстрого запуска.
package dav.network
import org.apache.flink.api.scala._
import org.apache.flink.api.scala.extensions._
import org.apache.flink.streaming.api.scala.extensions._
import org.apache.flink.streaming.api.environment._
import org.apache.flink.streaming.api.datastream._
import java.util.Properties
import org.apache.flink.streaming.connectors.kafka._
import org.apache.flink.streaming.connectors.cassandra._
import org.apache.flink.streaming.util.serialization._
import org.apache.flink.api.common.functions._
object WordCount {
def main(args: Array[String]) {
val env = StreamExecutionEnvironment.createRemoteEnvironment(
"localhost",
8040,
"./target/scala-2.11/flink-test-assembly-0.1-SNAPSHOT.jar");
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("group.id", "test")
val kafkaConsumer =
new FlinkKafkaConsumer011[String]("topic",
new SimpleStringSchema(),
properties);
val counts = env.addSource(kafkaConsumer)
.flatMap { _.toLowerCase.split("\\W+") }
.map { (_, 1) }
.groupBy(0)
.sum(1);
env.execute
}
}
Не удается скомпилировать:
[info] Loading settings from assembly.sbt ...
[info] Loading project definition from /.../flink-test/project
[info] Loading settings from build.sbt,idea.sbt ...
[info] Set current project to flink-test (in build file:/.../flink-test/)
[success] Total time: 0 s, completed Oct 21, 2018 7:46:13 PM
[info] Updating ...
[info] Done updating.
[info] Compiling 3 Scala sources to /.../flink-test/target/scala-2.11/classes ...
[error] /.../flink-test/src/main/scala/dav/network/WordCount.scala:30:16: missing parameter type for expanded function ((x$1) => x$1.toLowerCase.split("\\W+"))
[error] .flatMap { _.toLowerCase.split("\\W+") }
[error] ^
[error] one error found
[error] (Compile / compileIncremental) Compilation failed
[error] Total time: 4 s, completed Oct 21, 2018 7:46:17 PM
Единственное, что компилируется, это:
val counts: DataStream[(String)] = env.addSource(kafkaConsumer);
val counts1=counts.map((_));
.map[String](new MapFunction[String, (String)] {
override def map(x: String): (String) = (x)
});