Ошибка: использование Spark Structured Streaming для чтения и записи данных в другой topi c в kafka - PullRequest
0 голосов
/ 26 мая 2020

Я выполняю небольшую задачу по чтению файла access_logs с помощью kafka topi c, затем я подсчитываю статус и отправляю количество Status другому kafka topi c. Но я продолжаю получать ошибки, например, пока я не использую режим вывода или режим добавления:

Exception in thread "main" org.apache.spark.sql.AnalysisException: Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark;;

При использовании полного режима:

Exception in thread "main" org.apache.spark.sql.streaming.StreamingQueryException: requirement failed: KafkaTable does not support Complete mode.

Это мой код: structuredStreaming. scala

package com.spark.sparkstreaming

import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.sql._
import org.apache.log4j._
import org.apache.spark.sql.functions._

import java.util.regex.Pattern
import java.util.regex.Matcher
import java.text.SimpleDateFormat
import java.util.Locale

import Utilities._
object structuredStreaming {

  case class LogEntry(ip:String, client:String, user:String, dateTime:String, request:String, status:String, bytes:String, referer:String, agent:String)

  val logPattern = apacheLogPattern()
  val datePattern = Pattern.compile("\\[(.*?) .+]")

  def parseDateField(field: String): Option[String] = {

    val dateMatcher = datePattern.matcher(field)
    if (dateMatcher.find) {
      val dateString = dateMatcher.group(1)
      val dateFormat = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss", Locale.ENGLISH)
      val date = (dateFormat.parse(dateString))
      val timestamp = new java.sql.Timestamp(date.getTime());
      return Option(timestamp.toString())
    } else {
      None
    }
  }

  def parseLog(x:Row) : Option[LogEntry] = {

    val matcher:Matcher = logPattern.matcher(x.getString(0));
    if (matcher.matches()) {
      val timeString = matcher.group(4)
      return Some(LogEntry(
        matcher.group(1),
        matcher.group(2),
        matcher.group(3),
        parseDateField(matcher.group(4)).getOrElse(""),
        matcher.group(5),
        matcher.group(6),
        matcher.group(7),
        matcher.group(8),
        matcher.group(9)
      ))
    } else {
      return None
    }
  }

  def main(args: Array[String]) {

    val spark = SparkSession
      .builder
      .appName("StructuredStreaming")
      .master("local[*]")
      .config("spark.sql.streaming.checkpointLocation", "/home/UDHAV.MAHATA/Documents/Checkpoints")
      .getOrCreate()

    setupLogging()

//    val rawData = spark.readStream.text("/home/UDHAV.MAHATA/Documents/Spark/logs")
    val rawData = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("subscribe", "testing")
      .load()
    import spark.implicits._

    val structuredData = rawData.flatMap(parseLog).select("status")
    val windowed = structuredData.groupBy($"status").count()
    //val query = windowed.writeStream.outputMode("complete").format("console").start()
    val query = windowed
      .writeStream
        .outputMode("complete")
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("topic", "sink")
      .start()
    query.awaitTermination()
    spark.stop()
  }

}

Утилиты. scala

package com.spark.sparkstreaming

import org.apache.log4j.Level
import java.util.regex.Pattern
import java.util.regex.Matcher

object Utilities {
  def setupLogging() = {
    import org.apache.log4j.{Level, Logger}
    val rootLogger = Logger.getRootLogger()
    rootLogger.setLevel(Level.ERROR)
  }
 def apacheLogPattern():Pattern = {
    val ddd = "\\d{1,3}"
    val ip = s"($ddd\\.$ddd\\.$ddd\\.$ddd)?"
    val client = "(\\S+)"
    val user = "(\\S+)"
    val dateTime = "(\\[.+?\\])"
    val request = "\"(.*?)\""
    val status = "(\\d{3})"
    val bytes = "(\\S+)"
    val referer = "\"(.*?)\""
    val agent = "\"(.*?)\""
    val regex = s"$ip $client $user $dateTime $request $status $bytes $referer $agent"
    Pattern.compile(regex)
  }
}

Может ли кто-нибудь помочь мне, где я делаю ошибку?

1 Ответ

1 голос
/ 26 мая 2020

Как указано в сообщении об ошибке, вам необходимо добавить водяной знак в вашу группу.

Замените эту строку

val windowed = structuredData.groupBy($"status").count()

на

import org.apache.spark.sql.functions.{window, col}

val windowed = structuredData.groupBy(window(col("dateTime"), "10 minutes"), "status").count()

Это Важно, чтобы столбец dateTime имел тип timestamp, который вы все равно анализируете из источника Kafka, если я правильно понял ваш код.

Без окна Spark не будет знать, сколько данных нужно агрегировать.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...