Как написать оконную агрегацию в формате CSV? - PullRequest
0 голосов
/ 09 января 2019

Я занимаюсь разработкой приложения Spark Structured Streaming, которое транслирует CSV-файлы и объединяет их со статическими данными. Я произвел некоторую агрегацию после объединения.

При записи результата запроса в HDFS в формате CSV я получаю следующую ошибку:

19/01/09 14:00:30 ERROR MicroBatchExecution: Query [id = 830ca987-b55a-4c03-aa13-f71bc57e47ad, runId = 87cdb029-0022-4f1c-b55e-c2443c9f058a] terminated with error java.lang.UnsupportedOperationException: CSV data source does not support struct<start:timestamp,end:timestamp> data type.
    at org.apache.spark.sql.execution.datasources.csv.CSVUtils$.org$apache$spark$sql$execution$datasources$csv$CSVUtils$$verifyType$1(CSVUtils.scala:127)
    at org.apache.spark.sql.execution.datasources.csv.CSVUtils$$anonfun$verifySchema$1.apply(CSVUtils.scala:131)
    at org.apache.spark.sql.execution.datasources.csv.CSVUtils$$anonfun$verifySchema$1.apply(CSVUtils.scala:131)

Что может быть основной причиной?

Вот соответствующие части моего кода:

val spark = SparkSession
  .builder
  .enableHiveSupport()
  .config("hive.exec.dynamic.partition", "true")
  .config("hive.exec.dynamic.partition.mode", "nonstrict")
  .config("spark.sql.streaming.checkpointLocation", "/user/sas/sparkCheckpoint")
  .getOrCreate

...

val df_agg_without_time = sqlResultjoin
  .withWatermark("event_time", "10 seconds")
  .groupBy(
    window($"event_time", "10 seconds", "5 seconds"),
    $"section",
    $"timestamp")
  .agg(sum($"total") as "total")

...

finalTable_repo
  .writeStream
  .outputMode("append")
  .partitionBy("xml_data_dt")
  .format("csv")
  .trigger(Trigger.ProcessingTime("2 seconds"))
  .option("path", "hdfs://op/apps/hive/warehouse/area.db/finalTable_repo")
  .start

1 Ответ

0 голосов
/ 11 января 2019

Строка, где выполняется агрегация .groupBy(window($"event_time", "10 seconds", "5 seconds"), $"section", $"timestamp"), создает тип данных struct<start:timestamp,end:timestamp>, который не поддерживается источником данных CSV.

Просто df_agg_without_time.printSchema и вы увидите столбец.

Решение состоит в том, чтобы просто преобразовать его в какой-то другой более простой тип (возможно, с select или withColumn) или просто select из него (т.е. не включить в следующий кадр данных).


Ниже приведен пример структурированного запроса (не потокового), который показывает схему, которую использует ваш потоковый структурированный запрос (при создании df_agg_without_time).

val q = spark
  .range(4)
  .withColumn("t", current_timestamp)
  .groupBy(window($"t", "10 seconds"))
  .count
scala> q.printSchema
root
 |-- window: struct (nullable = false)
 |    |-- start: timestamp (nullable = true)
 |    |-- end: timestamp (nullable = true)
 |-- count: long (nullable = false)

Для примера потокового запроса вы можете использовать источник данных о скорости.

val q = spark
  .readStream
  .format("rate")
  .load
  .groupBy(window($"timestamp", "10 seconds"))
  .count
scala> q.printSchema
root
 |-- window: struct (nullable = false)
 |    |-- start: timestamp (nullable = true)
 |    |-- end: timestamp (nullable = true)
 |-- count: long (nullable = false)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...