Spark Структурированный поток null в выходном наборе данных - PullRequest
0 голосов
/ 05 марта 2019

Я запускаю Scala-код, который объединяет данные и выводит вывод на консоль.К сожалению, я получил нулевые значения после групповой операции. Токовый выход:

| Id | Date |Count |
| null | null |35471 |

Я понял, что горлышко бутылки - это точка, когда я группирую данные - когда я пытаюсь использовать столбец, отличный от числового, вывод возвращает нули.Любой совет будет приветствоваться - я потерял часы, чтобы найти решение.

Мой код:

// create schema
val sensorsSchema = new StructType()
  .add("SensorId", IntegerType)
  .add("Timestamp", TimestampType)
  .add("Value", DoubleType)
  .add("State", StringType)

// read streaming data from csv...

// aggregate streaming data
val streamAgg = streamIn
  .withColumn("Date", to_date(unix_timestamp($"Timestamp", "dd/MM/yyyy").cast(TimestampType)))
  .groupBy("SensorId", "Date")
  .count()

// write streaming data...

1 Ответ

0 голосов
/ 05 марта 2019

Я меняю код - теперь отлично работает:

/****************************************
* STREAMING APP
* 1.0 beta
*****************************************
* read data from csv (local)
* and save as parquet (local)
****************************************/

package tk.streaming

import org.apache.spark.SparkConf
import org.apache.spark.sql._
// import org.apache.spark.sql.functions._

case class SensorsSchema(SensorId: Int, Timestamp: String, Value: Double, State: String, OperatorId: Int)


object Runner {

  def main(args: Array[String]): Unit = {

    // Configuration parameters (to create spark session and contexts)
    val appName = "StreamingApp" // app name
    val master = "local[*]" // master configuration
    val dataDir = "/home/usr_spark/Projects/SparkStreaming/data"
    val refreshInterval = 30 // seconds


    // initialize context
    val conf = new SparkConf().setMaster(master).setAppName(appName)
    val spark = SparkSession.builder.config(conf).getOrCreate()


    import spark.implicits._

    // TODO change file source to Kafka (must)

    // read streaming data
    val sensorsSchema = Encoders.product[SensorsSchema].schema
    val streamIn = spark.readStream
      .format("csv")
      .schema(sensorsSchema)
      .load(dataDir + "/input")
      .select("SensorId", "Timestamp", "State", "Value") // remove "OperatorId" column


    // TODO save result in S3 (nice to have)

    // write streaming data
    import org.apache.spark.sql.streaming.Trigger
    val streamOut = streamIn.writeStream
      .queryName("streamingOutput")
      .format("parquet")
      .option("checkpointLocation", dataDir + "/output/checkpoint")
      .option("path", dataDir + "/output")
      .start()

    streamOut.awaitTermination() // start streaming data

  }
}
...