Выход Spark Structured Streaming не отображается в консоли inteliJ - PullRequest
0 голосов
/ 30 ноября 2018

Я пытаюсь эмулировать этот пример из книги Яцека Ласковского о чтении CSV-файла и агрегировании данных в консоли, но по какой-то причине вывод не отображается в консоли InteliJ.

scala> spark.version
res4: String = 2.2.0

В некоторых местах я нашел ссылку ( 1 , 2 , 3 , 4 , 5 ) здесь, в SO, я попробовал все, но я не решил проблему.

Это код:

package org.sample

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.{OutputMode, Trigger}

object App {
  def main(args : Array[String]): Unit = {

    val DIR = new java.io.File(".").getCanonicalPath + "dataset/stream_in"

    val conf = new SparkConf()
      .setMaster("local[*]")
      .setAppName("Spark Structured Streaming Job")

    val spark = SparkSession.builder()
      .appName("Spark Structured Streaming Job")
      .master("local[*]")
      .getOrCreate()

    val reader = spark.readStream
      .format("csv")
      .option("header", true)
      .option("delimiter", ";")
      .option("latestFirst", "true")
      .schema(SchemaDefinition.csvSchema)
      .load(DIR + "/*")

    reader.createOrReplaceTempView("user_records")

    val tranformation = spark.sql(
      """
        SELECT carrier, marital_status, COUNT(1) as num_users
        FROM user_records
        GROUP BY carrier, marital_status
      """
    )

    val consoleStream = tranformation
      .writeStream
      .format("console")
      .option("truncate", false)
      .outputMode("complete")
      .start()

    consoleStream.awaitTermination()
  }
}

Мой вывод это только:

18/11/30 15:40:31 INFO StreamExecution: Streaming query made progress: {
  "id" : "9420f826-0daf-40c9-a427-e89ed42ee738",
  "runId" : "991c9085-3425-4ea6-82af-4cef20007a66",
  "name" : null,
  "timestamp" : "2018-11-30T14:40:31.117Z",
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
    "getOffset" : 2,
    "triggerExecution" : 2
  },
  "eventTime" : {
    "watermark" : "1970-01-01T00:00:00.000Z"
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "FileStreamSource[file:/structured-streamming-taskdataset/stream_in/*]",
    "startOffset" : null,
    "endOffset" : null,
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0
  } ],
  "sink" : {
    "description" : "org.apache.spark.sql.execution.streaming.ConsoleSink@6a62e7ef"
  }
}

1 Ответ

0 голосов
/ 01 декабря 2018

Я переопределяю файл и сейчас у меня работает:

Отличия:

  1. Удалите ненужные confSparkSession нам не нужно вызывать conf
  2. .load(/*) не работает.Работало только сохранение пути dataset/stream_in;
  3. Данные в tranformation были неправильными (поля не совпадали с файлом)

Окончательный код:

package org.sample

import org.apache.spark.sql.SparkSession
import org.apache.log4j.{Level, Logger}

object StreamCities {

  def main(args : Array[String]): Unit = {

    // Turn off logs in console
    Logger.getLogger("org").setLevel(Level.OFF)
    Logger.getLogger("akka").setLevel(Level.OFF)

    val spark = SparkSession.builder()
      .appName("Spark Structured Streaming get CSV and agregate")
      .master("local[*]")
      .getOrCreate()

    // 01. Schema Definition: We'll put the structure of our
    // CSV file. Can be done using a class, but for simplicity
    // I'll keep it here
    import org.apache.spark.sql.types._
    def csvSchema = StructType {
      StructType(Array(
        StructField("id", StringType, true),
        StructField("name", StringType, true),
        StructField("city", StringType, true)
      ))
    }

    // 02. Read the Stream: Create DataFrame representing the
    // stream of the CSV according our Schema. The source it is
    // the folder in the .load() option
    val users = spark.readStream
      .format("csv")
      .option("sep", ",")
      .option("header", true)
      .schema(csvSchema)
      .load("dataset/stream_in")

    // 03. Aggregation of the Stream: To use the .writeStream()
    // we must pass a DF aggregated. We can do this using the
    // Untyped API or SparkSQL

    // 03.1: Aggregation using untyped API
    //val aggUsers = users.groupBy("city").count()

    // 03.2: Aggregation using Spark SQL
    users.createOrReplaceTempView("user_records")

    val aggUsers = spark.sql(
      """
        SELECT city, COUNT(1) as num_users
        FROM user_records
        GROUP BY city"""
    )

    // Print the schema of our aggregation
    println(aggUsers.printSchema())

    // 04. Output the stream: Now we'll write our stream in
    // console and as new files will be included in the folder
    // that Spark it's listening the results will be updated
    val consoleStream = aggUsers.writeStream
      .outputMode("complete")
      .format("console")
      .start()
      .awaitTermination()
  }
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...