Flink CEP Обнаружена неизвестная ошибка в IntelliJ IDE - PullRequest
0 голосов
/ 13 января 2020

Я начал изучать Apache библиотеки CEP Флинка на Scala языке, и когда я пытался создать PatternStream, выполнив CEP.pattern(input,pattern), как показано в руководстве по https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/cep.html, IDE говорит, что «Не удается разрешить перегруженный метод», ссылаясь на метод pattern. В соответствии с реализацией readTextFile и Pattern[String].begin('line').where(_.length == 10), которые я использовал для создания входных данных и шаблона соответственно, не должно быть проблем с аргументами метода или обобщенными типами c.

Вот код, который я написал. Я знаю, что он не завершен, но я все равно не смог его завершить, так как возникла эта проблема.

package FlinkCEPClasses

import java.util.Properties

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.cep.CEP
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.core.fs.FileSystem
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer

class FlinkCEPPipeline {

  var props : Properties = new Properties()

  var env : StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
  env.setParallelism(1)

  var input : DataStream[String] = env.readTextFile("/home/luca/Desktop/lines")

  var patt : Pattern[String,String] = Pattern.begin[String]("igual").where(_.length == 10)

  // Problem appears at the following line. A red subscript appears at the pattern method, 
  // saying the following: "Cannot resolve overloaded method"

  var CEPstream = CEP.pattern(input,patt)

  input.writeAsText("/home/luca/Desktop/flinkcepout",FileSystem.WriteMode.OVERWRITE)

  env.execute()


Вот содержание моего файла build.sbt:


name := "FlinkCEP"

version := "0.1"

scalaVersion := "2.12.10"

// https://mvnrepository.com/artifact/org.apache.flink/flink-cep-scala
libraryDependencies += "org.apache.flink" %% "flink-cep-scala" % "1.9.0"

// https://mvnrepository.com/artifact/org.apache.flink/flink-scala
libraryDependencies += "org.apache.flink" %% "flink-scala" % "1.9.0"

// https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala
libraryDependencies += "org.apache.flink" %% "flink-streaming-scala" % "1.9.0"

// https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka
libraryDependencies += "org.apache.flink" %% "flink-connector-kafka" % "1.9.0"

libraryDependencies += "log4j" % "log4j" % "1.2.17"

libraryDependencies += "ch.qos.logback" % "logback-classic" % "1.1.3" % Runtime

libraryDependencies += "org.slf4j" % "slf4j-simple" % "1.6.2" % Test```

My Цель этого кода - просто увидеть, что он выполняет простое условие «где», кроме этого у него не должно быть никакой большей утилиты. Я использую IntelliJ в качестве IDE. Также я не уверен, что библиотеки Scala для CEP готовы к использованию. Буду признателен, если кто-нибудь сможет пролить свет на это.

Ответы [ 2 ]

0 голосов
/ 22 января 2020

Что ж, я наконец-то решил проблему, посмотрев примеры github @DavidAnderson. Поскольку я внес много изменений, я не могу точно сказать, будет ли мое решение работать на вас, но я изменил с import org.apache.flink.streaming.api.datastream.DataStream на import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, DataStream, _}. Следите за неоднозначным импортом и убедитесь, что вы импортируете действительно необходимые классы.

Я перечислю все мои импорты и мой файл build.sbt, чтобы у вас был полный доступ к моей конфигурации.

Импорт

import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.core.fs.FileSystem
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.cep.scala.PatternStream
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.cep.scala.{CEP, PatternStream}
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, DataStream, _}

Build.sbt

name := "FlinkCEP"

version := "0.1"

scalaVersion := "2.12.10"

// https://mvnrepository.com/artifact/org.apache.flink/flink-cep-scala
//libraryDependencies += "org.apache.flink" %% "flink-cep-scala" % "1.9.0"

// https://mvnrepository.com/artifact/org.apache.flink/flink-cep
libraryDependencies += "org.apache.flink" %% "flink-cep" % "1.9.0"

// https://mvnrepository.com/artifact/org.apache.flink/flink-cep-scala
libraryDependencies += "org.apache.flink" %% "flink-cep-scala" % "1.9.0"

// https://mvnrepository.com/artifact/org.apache.flink/flink-runtime
libraryDependencies += "org.apache.flink" %% "flink-runtime" % "1.9.0" % Test


// https://mvnrepository.com/artifact/org.apache.flink/flink-scala
libraryDependencies += "org.apache.flink" %% "flink-scala" % "1.9.0"

// https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala
libraryDependencies += "org.apache.flink" %% "flink-streaming-scala" % "1.9.0"

// https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka
libraryDependencies += "org.apache.flink" %% "flink-connector-kafka" % "1.9.0"

libraryDependencies += "log4j" % "log4j" % "1.2.17"

libraryDependencies += "ch.qos.logback" % "logback-classic" % "1.1.3" % Runtime

libraryDependencies += "org.slf4j" % "slf4j-simple" % "1.6.2" % Test
0 голосов
/ 14 января 2020

Попробуйте это:

import org.apache.flink.cep.scala.PatternStream

...

val CEPstream: PatternStream[String] = CEP.pattern[String](input, patt)

См. github для простого примера, который использует CEP с Scala.

...