Исключение структурированной потоковой передачи: добавление режима вывода без водяного знака не поддерживается - PullRequest
0 голосов
/ 09 января 2019

Я выполнил простую группировку по годам и произвел некоторую агрегацию, как показано ниже. Я попытался добавить результат в путь hdfs, как показано ниже. Я получаю сообщение об ошибке,

   org.apache.spark.sql.AnalysisException: Append output mode not supported 
   when there are streaming aggregations on streaming DataFrames/DataSets 
   without watermark;;
   Aggregate [year#88], [year#88, sum(rating#89) AS rating#173, 
   sum(cast(duration#90 as bigint)) AS duration#175L]
   +- EventTimeWatermark event_time#96: timestamp, interval 10 seconds

ниже мой код. Может кто-нибудь, пожалуйста, помогите

    val spark =SparkSession.builder().appName("mddd").
    enableHiveSupport().config("hive.exec.dynamic.partition", "true").
    config("hive.exec.dynamic.partition.mode", "nonstrict").
    config("spark.sql.streaming.checkpointLocation", "/user/sa/sparkCheckpoint").
    config("spark.debug.maxToStringFields",100).
    getOrCreate()

    val mySchema = StructType(Array(
     StructField("id", IntegerType),
     StructField("name", StringType),
     StructField("year", IntegerType),
     StructField("rating", DoubleType),
     StructField("duration", IntegerType)
    ))

    val xmlData = spark.readStream.option("sep", ",").schema(mySchema).csv("file:///home/sa/kafdata/") 
    import java.util.Calendar
    val df_agg_without_time= xmlData.withColumn("event_time", to_utc_timestamp(current_timestamp, Calendar.getInstance().getTimeZone().getID()))

    val df_agg_with_time = df_agg_without_time.withWatermark("event_time", "10 seconds").groupBy($"year").agg(sum($"rating").as("rating"),sum($"duration").as("duration"))
    val cop = df_agg_with_time.withColumn("column_name_with", to_json(struct($"window")))

    df_agg_with_time.writeStream.outputMode("append").partitionBy("year").format("csv").
    option("path", "hdfs://dio/apps/hive/warehouse/gt.db/sample_mov/").start()

мой ввод в формате csv

    id,name,year,rating,duration
    1,The Nightmare Before Christmas,1993,3.9,4568
    2,The Mummy,1993,3.5,4388
    3,Orphans of the Storm,1921,3.2,9062
    4,The Object of Beauty,1921,2.8,6150
    5,Night Tide,1963,2.8,5126
    6,One Magic Christmas,1963,3.8,5333
    7,Muriel's Wedding,1963,3.5,6323
    8,Mother's Boys,1963,3.4,5733

мой ожидаемый результат должен быть в формате hdf с разделом на год

    year,rating,duration
    1993,7.4,8956
    1921,6.0,15212
    1963,10.7,17389

Я действительно не уверен, что не так с моим подходом. пожалуйста помогите

1 Ответ

0 голосов
/ 10 января 2019

Это вопрос со многими аспектами:

  • API структурированной потоковой передачи имеет ограничения imho.
  • Можно передавать несколько запросов, и технически он выполняется, но не выводит никаких данных, поэтому делать это бесполезно - и он не может выполнять такие другие функции, даже если вы можете его указать.
  • В руководстве указано: withWatermark должен вызываться в том же столбце, что и отметка времени столбец, используемый в совокупности.

    Например, df.withWatermark («время», «1 min "). groupBy (" time2 "). count () недопустима в режиме добавления выходных данных, так как водяной знак определяется в другом столбце из агрегации колонка. Проще говоря, для добавления вам нужен WaterMark. Я думаю, что у вас есть проблема здесь.

  • Является ли следующее использование при использовании пути?

  .enableHiveSupport().config("hive.exec.dynamic.partition", "true")
  .config("hive.exec.dynamic.partition.mode", "nonstrict")
  • Кроме того, ваш окончательный вариант использования неизвестен. Вопрос здесь заключается в том, является ли это хорошим подходом, но я слишком мало понимаю, чтобы оценить, мы просто предполагаем, что это так.
  • Мы предполагаем, что рейтинги одного и того же фильма будут частью будущей микробатки.
  • В ленте отсутствует event_time, но вы создаете его самостоятельно. Несколько нереально, но хорошо, мы можем с этим смириться, хотя TimeStamp немного проблематичен.
  • Советую заглянуть в этот блог http://blog.madhukaraphatak.com/introduction-to-spark-structured-streaming-part-12/, чтобы получить отличную оценку структурированной потоковой передачи.

Итак, в общем:

  • Из вариантов Завершить, Добавить и Обновить Я думаю, что вы выбрали правильный вариант Добавить. Обновление может быть использовано, но я оставляю это за рамками.
  • Но не поместил event_time в окно. Ты должен сделать это. В конце я привел пример, запустил в Spark Shell, где я не смог заставить работать класс case - вот почему это заняло так много времени, но в скомпилированной программе это не проблема, или DataBricks.
  • Функционально вы не можете написать несколько запросов, чтобы выполнить объединение, которое вы пробовали. В моем случае это просто приводит к ошибкам.
  • Я бы посоветовал вам использовать метку времени, которую я использовал, это проще, так как я не смог проверить все ваши вещи.

Тогда:

  • Либо запишите выходные данные этого модуля в тему KAFKA, и прочитайте эту тему в другой модуль, а затем выполните второе объединение и запись, учитывая, что вы можете получить несколько оценок фильмов в разных микробах.
  • Или запишите данные в том виде, в котором они есть, включая поле подсчета, а затем предоставьте слой представления для запросов, который учитывает тот факт, что было несколько записей.

Вот пример с использованием ввода сокетов и Spark Shell - который вы можете экстраполировать на ваши собственные данные, а также на выход микропакета (обратите внимание, что при просмотре данных возникают задержки):

import java.sql.Timestamp
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.OutputMode

val sparkSession = SparkSession.builder
  .master("local")
  .appName("example")
  .getOrCreate()
//create stream from socket

import sparkSession.implicits._
sparkSession.sparkContext.setLogLevel("ERROR")
val socketStreamDs = sparkSession.readStream
  .format("socket")
  .option("host", "localhost")
  .option("port", 9999)
  .load()
  .as[String]

val stockDs = socketStreamDs.map(value => (value.trim.split(","))).map(entries=>(new java.sql.Timestamp(entries(0).toLong),entries(1),entries(2).toDouble)).toDF("time","symbol","value")//.toDS() 

val windowedCount = stockDs
  .withWatermark("time", "20000 milliseconds")
  .groupBy( 
    window($"time", "10 seconds"),
           $"symbol" 
  )
  .agg(sum("value"), count($"symbol"))

val query =
  windowedCount.writeStream
    .format("console")
    .option("truncate", "false")
    .outputMode(OutputMode.Append())

query.start().awaitTermination()

Результат:

Batch: 14
----------------------------------------------+------+----------+-------------+  
|window                                       |symbol|sum(value)|count(symbol)|
+---------------------------------------------+------+----------+-------------+
|[2016-04-27 04:34:20.0,2016-04-27 04:34:30.0]|"aap1"|4200.0    |6            |
|[2016-04-27 04:34:30.0,2016-04-27 04:34:40.0]|"app1"|800.0     |2            |
|[2016-04-27 04:34:20.0,2016-04-27 04:34:30.0]|"aap2"|2500.0    |1            |
|[2016-04-27 04:34:40.0,2016-04-27 04:34:50.0]|"app1"|2800.0    |4            |
+---------------------------------------------+------+----------+-------------+

Это довольно большая тема, и вы должны смотреть на нее целостно.

Для вывода вы можете увидеть, что в некоторых случаях может быть полезно иметь счетчик, хотя вывод avg можно использовать для подсчета общего значения avg. Успех.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...