Apache flink (1.9.1) исключение времени выполнения при использовании case-классов в scala (2.12.8) - PullRequest
0 голосов
/ 23 января 2020

Я использую класс case в приложении Scala (2.12.8) Apache Flink (1.9.1). Я получаю следующее исключение, когда запускаю код ниже Caused by: java.lang.NoSuchMethodError: scala.Product.$init$(Lscala/Product;)V.

ПРИМЕЧАНИЕ. Я использовал конструктор по умолчанию согласно предложению (java .lang.NoSuchMethodException для метода init в Scala case case ), но в моем случае это не работает

Вот полный код

Исключение времени выполнения:

    at com.zignallabs.AddCount.<init>(WordCount.scala:7)
    at com.zignallabs.WordCount$.$anonfun$main$1(WordCount.scala:31)
    at org.apache.flink.api.scala.DataSet$$anon$1.map(DataSet.scala:490)
    at org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:79)
    at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
    at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:196)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
    at java.lang.Thread.run(Thread.java:748)

Я собираю и запускаю flink локально, используя локальный кластер flink с версией flink 1.9.1.

Вот файл build.sbt:

name := "flink191KafkaScala"

version := "0.1-SNAPSHOT"

organization := "com.zignallabs"

scalaVersion := "2.12.8"

val flinkVersion = "1.9.1"

//javacOptions ++= Seq("-source", "1.7", "-target", "1.7")

val http4sVersion = "0.16.6"

resolvers ++= Seq(
  "Local Ivy" at "file:///"+Path.userHome+"/.ivy2/local",
  "Local Ivy Cache" at "file:///"+Path.userHome+"/.ivy2/cache",
  "Local Maven Repository" at "file:///"+Path.userHome+"/.m2/repository",
  "Artifactory Cache" at "https://zignal.artifactoryonline.com/zignal/zignal-repos"
)

val excludeCommonsLogging = ExclusionRule(organization = "commons-logging")

libraryDependencies ++= Seq(
  "org.apache.flink" %% "flink-scala" % flinkVersion % "provided",
  "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided",

  "org.apache.flink" %% "flink-clients" % "1.9.1",
  // Upgrade to flink-connector-kafka_2.11
  "org.apache.flink" %% "flink-connector-kafka-0.11" % "1.9.1",
  //"org.scalaj" %% "scalaj-http" % "2.4.2",
  "com.squareup.okhttp3" % "okhttp" % "4.2.2"
)

publishTo := Some("Artifactory Realm" at "https://zignal.artifactoryonline.com/zignal/zignal")

credentials += Credentials("Artifactory Realm", "zignal.artifactoryonline.com", "buildserver", "buildserver")

//mainClass in Compile := Some("com.zignallabs.StoryCounterTopology")
mainClass in Compile := Some("com.zignallabs.WordCount")

scalacOptions ++= Seq(
  "-feature",
  "-unchecked",
  "-deprecation",
  "-language:implicitConversions",
  "-Yresolve-term-conflict:package",
  "-language:postfixOps",
  "-target:jvm-1.8")


lazy val root = project.in(file(".")).configs(IntegrationTest)

Ответы [ 2 ]

0 голосов
/ 13 марта 2020

Теперь я могу запустить это приложение, используя Scala 2.12. Проблема была в окружающей среде. Мне нужно было убедиться, что двоичные файлы конфликтов отсутствуют, особенно для scala 2.11 и scala 2.12

0 голосов
/ 24 января 2020

Если вы используете аргументы по умолчанию для конструкторов класса case, гораздо более идиоматично c Scala определять их следующим образом:

case class AddCount ( firstP: String = "default", count: Int = 1)

This является syntacti c сахар, который в основном бесплатно дает вам следующее:

case  class  AddCount ( firstP: String, count: Int) {
  def this () = this ("default", 1)

  def this (firstP:String) = this (firstP, 1)

  def this (count:Int) = this ("default", count)

}
...