Пустой файл CSV генерируется после обработки искровой структурированной потоковой передачи - PullRequest
0 голосов
/ 11 января 2019

когда я пытаюсь записать в CSV некоторые потоковые данные с искровой структурой, я вижу, что пустые файлы деталей генерируются в месте расположения hdfs. Я попытался написать то же самое на консоли, и данные были сгенерированы на консоли.

   val spark =SparkSession.builder().appName("micro").
    enableHiveSupport().config("hive.exec.dynamic.partition", "true").
    config("hive.exec.dynamic.partition.mode", "nonstrict").
    config("spark.sql.streaming.checkpointLocation", "/user/sasidhr1/sparkCheckpoint").
    config("spark.debug.maxToStringFields",100).
    getOrCreate()

    val mySchema = StructType(Array(
     StructField("id", IntegerType),
     StructField("name", StringType),
     StructField("year", IntegerType),
     StructField("rating", DoubleType),
     StructField("duration", IntegerType)
    ))

    val xmlData = spark.readStream.option("sep", ",").schema(mySchema).csv("file:///home/sa1/kafdata/") 
    import java.util.Calendar
    val df_agg_without_time= xmlData.withColumn("event_time", to_utc_timestamp(current_timestamp, Calendar.getInstance().getTimeZone().getID()))

    val df_agg_with_time = df_agg_without_time.withWatermark("event_time", "10 seconds").groupBy(window($"event_time", "10 seconds", "5 seconds"),$"year").agg(sum($"rating").as("rating"),sum($"duration").as("duration"))

   val pr = df_agg_with_time.drop("window")

    pr.writeStream.outputMode("append").format("csv").
    option("path", "hdfs://ccc/apps/hive/warehouse/rta.db/sample_movcsv/").start()

если я не уронил (окно) столбец, возникнет другая проблема ... эта проблема, которую я уже разместил здесь ... Как написать оконную агрегацию в формате CSV?

может кто-нибудь помочь с этим? как записать в hdfs как CSV-файл после агрегирования .. пожалуйста, помогите

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...