Я использую Spark Structured Streaming для классического варианта использования: я хочу прочитать тему kafka и записать поток в HDFS в формате паркета.
Вот мой код:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.types.{ArrayType, DataTypes, StructType}
object TestKafkaReader extends App{
val spark = SparkSession
.builder
.appName("Spark-Kafka-Integration")
.master("local")
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
import spark.implicits._
val kafkaDf = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers","KAFKA_BROKER_IP:PORT")
//.option("subscribe", "test")
.option("subscribe", "test")
.option("startingOffsets", "earliest")
.load()
val moviesJsonDf = kafkaDf.selectExpr("CAST(value AS STRING)")
// movie struct
val struct = new StructType()
.add("title", DataTypes.StringType)
.add("year", DataTypes.IntegerType)
.add("cast", ArrayType(DataTypes.StringType))
.add("genres", ArrayType(DataTypes.StringType))
val moviesNestedDf = moviesJsonDf.select(from_json($"value", struct).as("movie"))
// json flatten
val movieFlattenedDf = moviesNestedDf.selectExpr("movie.title", "movie.year", "movie.cast","movie.genres")
// convert to parquet and save to hdfs
val query = movieFlattenedDf
.writeStream
.outputMode("append")
.format("parquet")
.queryName("movies")
.option("checkpointLocation", "src/main/resources/chkpoint_dir")
.start("src/main/resources/output")
.awaitTermination()
}
Контекст:
- Я запускаю это напрямую из intellij (с установленной локальной искрой)
- Мне удается без проблем читать из kafka и писать в консоли (используярежим консоли)
- На данный момент я хочу записать файл на локальный компьютер (но я попытался на кластере HDFS, проблема та же)
Моя проблема:
Во время работы в папке ничего не записывается, мне нужно вручную остановить работу, чтобы наконец увидеть файлы.
Я подумал, что, возможно, что-то связано с .awaitTermination()
Для информации, я попытался удалить эту опцию, но без этого я получаю сообщение об ошибке, и работа просто не запускается.
Возможно, я не установил правильные опции, но после прочтения много раз документа и поиска поGoogle я ничего не нашел.
Не могли бы вы помочь мне в этом?
Спасибо
РЕДАКТИРОВАТЬ:
- Я использую свечи 2.4.0
- Я попробовал формат 64/128 МБ => ничего не изменилось, пока я не остановил работу