когда я пытаюсь записать в CSV некоторые потоковые данные с искровой структурой, я вижу, что пустые файлы деталей генерируются в месте расположения hdfs. Я попытался написать то же самое на консоли, и данные были сгенерированы на консоли.
val spark =SparkSession.builder().appName("micro").
enableHiveSupport().config("hive.exec.dynamic.partition", "true").
config("hive.exec.dynamic.partition.mode", "nonstrict").
config("spark.sql.streaming.checkpointLocation", "/user/sasidhr1/sparkCheckpoint").
config("spark.debug.maxToStringFields",100).
getOrCreate()
val mySchema = StructType(Array(
StructField("id", IntegerType),
StructField("name", StringType),
StructField("year", IntegerType),
StructField("rating", DoubleType),
StructField("duration", IntegerType)
))
val xmlData = spark.readStream.option("sep", ",").schema(mySchema).csv("file:///home/sa1/kafdata/")
import java.util.Calendar
val df_agg_without_time= xmlData.withColumn("event_time", to_utc_timestamp(current_timestamp, Calendar.getInstance().getTimeZone().getID()))
val df_agg_with_time = df_agg_without_time.withWatermark("event_time", "10 seconds").groupBy(window($"event_time", "10 seconds", "5 seconds"),$"year").agg(sum($"rating").as("rating"),sum($"duration").as("duration"))
val pr = df_agg_with_time.drop("window")
pr.writeStream.outputMode("append").format("csv").
option("path", "hdfs://ccc/apps/hive/warehouse/rta.db/sample_movcsv/").start()
если я не уронил (окно) столбец, возникнет другая проблема ... эта проблема, которую я уже разместил здесь ... Как написать оконную агрегацию в формате CSV?
может кто-нибудь помочь с этим? как записать в hdfs как CSV-файл после агрегирования .. пожалуйста, помогите