Невозможно проанализировать данные - PullRequest
0 голосов
/ 02 ноября 2019

val Patterns = ctx.getBroadcastState (patternStateDescriptor)

Импорт, который я сделал

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.common.state.{MapStateDescriptor, ValueState, ValueStateDescriptor}
import org.apache.flink.api.scala.typeutils.Types
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.datastream.BroadcastStream
import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector

Вот код

    val env = StreamExecutionEnvironment.getExecutionEnvironment

  val properties = new Properties()
  properties.setProperty("bootstrap.servers","localhost:9092")

  val patternStream = new FlinkKafkaConsumer010("patterns", new SimpleStringSchema, properties)

  val patterns = env.addSource(patternStream)

  var patternData = patterns.map {
    str =>
      val splitted_str = str.split(",")
      PatternStream(splitted_str(0).trim, splitted_str(1).trim, splitted_str(2).trim)
  }

  val logsStream = new FlinkKafkaConsumer010("logs", new SimpleStringSchema, properties)

//  logsStream.setStartFromEarliest()

  val logs = env.addSource(logsStream)

  var data = logs.map {
    str =>
      val splitted_str = str.split(",")
      LogsTest(splitted_str.head.trim, splitted_str(1).trim, splitted_str(2).trim)
  }

  val keyedData: KeyedStream[LogsTest, String] = data.keyBy(_.metric)

  val bcStateDescriptor = new MapStateDescriptor[Unit, PatternStream]("patterns", Types.UNIT, Types.of[PatternStream]) // first type defined is for the key and second data type defined is for the value

  val broadcastPatterns: BroadcastStream[PatternStream]  = patternData.broadcast(bcStateDescriptor)

  val alerts = keyedData
      .connect(broadcastPatterns)
      .process(new PatternEvaluator())

  alerts.print()

//   println(alerts.getClass)
//  val sinkProducer = new FlinkKafkaProducer010("output",  new SimpleStringSchema(), properties)



  env.execute("Flink Broadcast State Job")
}

class PatternEvaluator()
  extends KeyedBroadcastProcessFunction[String, LogsTest, PatternStream, (String, String, String)] {

  private lazy val patternStateDescriptor = new MapStateDescriptor("patterns", classOf[String], classOf[String])

  private var lastMetricState: ValueState[String] = _

  override def open(parameters: Configuration): Unit = {
    val lastMetricDescriptor = new ValueStateDescriptor("last-metric", classOf[String])

    lastMetricState = getRuntimeContext.getState(lastMetricDescriptor)
  }

  override def processElement(reading: LogsTest,
                              readOnlyCtx: KeyedBroadcastProcessFunction[String, LogsTest, PatternStream, (String, String, String)]#ReadOnlyContext,
                              out: Collector[(String, String, String)]): Unit = {

    val metrics = readOnlyCtx.getBroadcastState(patternStateDescriptor)
    if (metrics.contains(reading.metric)) {
      val metricPattern: String = metrics.get(reading.metric)
      val metricPatternValue: String = metrics.get(reading.value)
      val lastMetric = lastMetricState.value()

      val logsMetric = (reading.metric)
      val logsValue = (reading.value)


      if (logsMetric == metricPattern) {
        if (metricPatternValue == logsValue) {
          out.collect((reading.timestamp, reading.value, reading.metric))
        }
      }
    }
  }


  override def processBroadcastElement(
                                        update: PatternStream,
                                        ctx: KeyedBroadcastProcessFunction[String, LogsTest, PatternStream, (String, String, String)]#Context,
                                        out: Collector[(String, String, String)]
                                      ): Unit = {
    val patterns = ctx.getBroadcastState(patternStateDescriptor)

    if (update.metric == "IP") {
      patterns.put(update.metric /*,update.operator*/ , update.value)
    }
    //    else if (update.metric == "username"){
    //      patterns.put(update.metric, update.value)
    //    }
    //    else {
    //      println("No required data found")
    //    }
    //  }

  }
}

Пример данных: - Поток журналов

"21/09/98","IP", "5.5.5.5"

Pattern Stream

"IP","==","5.5.5.5"

Я не могу проанализировать данные, получив желаемый результат, т. Е. = 21/09/98, IP, 5.5.5.5

На данный момент ошибок нет, просто данные не анализируются

Код читает потоки (Проверено)

1 Ответ

0 голосов
/ 04 ноября 2019

Одним из распространенных источников проблем в подобных случаях является то, что API не дает никакого контроля над порядком поступления шаблонов и данных. Возможно, что processElement вызывается перед processBroadcastElement.

...