Apache Flink Scala Streaming - PullRequest
       12

Apache Flink Scala Streaming

0 голосов
/ 21 октября 2018

Не могу сделать ни один пример компиляции, не говоря уже о работе.Что я делаю неправильно?Запущено из шаблона быстрого запуска.

package dav.network

import org.apache.flink.api.scala._
import org.apache.flink.api.scala.extensions._
import org.apache.flink.streaming.api.scala.extensions._
import org.apache.flink.streaming.api.environment._
import org.apache.flink.streaming.api.datastream._
import java.util.Properties
import org.apache.flink.streaming.connectors.kafka._
import org.apache.flink.streaming.connectors.cassandra._
import org.apache.flink.streaming.util.serialization._
import org.apache.flink.api.common.functions._

object WordCount {
  def main(args: Array[String]) {
    val env = StreamExecutionEnvironment.createRemoteEnvironment(
      "localhost",
      8040,
      "./target/scala-2.11/flink-test-assembly-0.1-SNAPSHOT.jar");

    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "localhost:9092")
    properties.setProperty("group.id", "test")
    val kafkaConsumer =
      new FlinkKafkaConsumer011[String]("topic",
                                        new SimpleStringSchema(),
                                        properties);
    val counts = env.addSource(kafkaConsumer)
    .flatMap { _.toLowerCase.split("\\W+") }
    .map { (_, 1) }
    .groupBy(0)
    .sum(1);
    env.execute
  }
}

Не удается скомпилировать:

[info] Loading settings from assembly.sbt ...
[info] Loading project definition from /.../flink-test/project
[info] Loading settings from build.sbt,idea.sbt ...
[info] Set current project to flink-test (in build file:/.../flink-test/)
[success] Total time: 0 s, completed Oct 21, 2018 7:46:13 PM
[info] Updating ...
[info] Done updating.
[info] Compiling 3 Scala sources to /.../flink-test/target/scala-2.11/classes ...
[error] /.../flink-test/src/main/scala/dav/network/WordCount.scala:30:16: missing parameter type for expanded function ((x$1) => x$1.toLowerCase.split("\\W+"))
[error]     .flatMap { _.toLowerCase.split("\\W+") }
[error]                ^
[error] one error found
[error] (Compile / compileIncremental) Compilation failed
[error] Total time: 4 s, completed Oct 21, 2018 7:46:17 PM

Единственное, что компилируется, это: val counts: DataStream[(String)] = env.addSource(kafkaConsumer); val counts1=counts.map((_)); .map[String](new MapFunction[String, (String)] { override def map(x: String): (String) = (x) });

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...