Spark Structured Streaming writestream не записывает файл, пока я не остановлю работу - PullRequest
0 голосов
/ 28 февраля 2019

Я использую Spark Structured Streaming для классического варианта использования: я хочу прочитать тему kafka и записать поток в HDFS в формате паркета.

Вот мой код:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.types.{ArrayType, DataTypes, StructType}

object TestKafkaReader extends  App{
  val spark = SparkSession
    .builder
    .appName("Spark-Kafka-Integration")
    .master("local")
    .getOrCreate()
  spark.sparkContext.setLogLevel("ERROR")
  import spark.implicits._

  val kafkaDf = spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers","KAFKA_BROKER_IP:PORT")
    //.option("subscribe", "test")
    .option("subscribe", "test")
    .option("startingOffsets", "earliest")
    .load()

  val moviesJsonDf = kafkaDf.selectExpr("CAST(value AS STRING)")

  // movie struct
  val struct = new StructType()
    .add("title", DataTypes.StringType)
    .add("year", DataTypes.IntegerType)
    .add("cast", ArrayType(DataTypes.StringType))
    .add("genres", ArrayType(DataTypes.StringType))

  val moviesNestedDf = moviesJsonDf.select(from_json($"value", struct).as("movie"))
  // json flatten
  val movieFlattenedDf = moviesNestedDf.selectExpr("movie.title", "movie.year", "movie.cast","movie.genres")


  // convert to parquet and save to hdfs
  val query = movieFlattenedDf
    .writeStream
    .outputMode("append")
    .format("parquet")
    .queryName("movies")
    .option("checkpointLocation", "src/main/resources/chkpoint_dir")
    .start("src/main/resources/output")
    .awaitTermination()
  }

Контекст:

  • Я запускаю это напрямую из intellij (с установленной локальной искрой)
  • Мне удается без проблем читать из kafka и писать в консоли (используярежим консоли)
  • На данный момент я хочу записать файл на локальный компьютер (но я попытался на кластере HDFS, проблема та же)

Моя проблема:

Во время работы в папке ничего не записывается, мне нужно вручную остановить работу, чтобы наконец увидеть файлы.

Я подумал, что, возможно, что-то связано с .awaitTermination()Для информации, я попытался удалить эту опцию, но без этого я получаю сообщение об ошибке, и работа просто не запускается.

Возможно, я не установил правильные опции, но после прочтения много раз документа и поиска поGoogle я ничего не нашел.

Не могли бы вы помочь мне в этом?

Спасибо

РЕДАКТИРОВАТЬ:

  • Я использую свечи 2.4.0
  • Я попробовал формат 64/128 МБ => ничего не изменилось, пока я не остановил работу

1 Ответ

0 голосов
/ 09 марта 2019

Да, решение проблемы

Моя проблема заключалась в том, что у меня было слишком мало данных, и спарк ожидал, что для записи файла паркета потребуется больше данных.

Чтобы сделать эту работу, я использую комментарий от@AlexandrosBiratsis (измените размер блока)

Еще раз всем спасибо @AlexandrosBiratsis большое спасибо

...