Как вставить искровой структурированный потоковый DataFrame во внешнюю таблицу / местоположение Hive? - PullRequest
0 голосов
/ 28 декабря 2018

Один запрос на интеграцию искровой структурированной потоковой передачи с таблицей HIVE.

Я попытался привести несколько примеров потоковой структурированной искровой обработки.

вот мой пример

 val spark =SparkSession.builder().appName("StatsAnalyzer")
     .enableHiveSupport()
     .config("hive.exec.dynamic.partition", "true")
     .config("hive.exec.dynamic.partition.mode", "nonstrict")
     .config("spark.sql.streaming.checkpointLocation", "hdfs://pp/apps/hive/warehouse/ab.db")
     .getOrCreate()

 // Register the dataframe as a Hive table

 val userSchema = new StructType().add("name", "string").add("age", "integer")
 val csvDF = spark.readStream.option("sep", ",").schema(userSchema).csv("file:///home/su/testdelta") 
 csvDF.createOrReplaceTempView("updates")
 val query= spark.sql("insert into table_abcd select * from updates")

 query.writeStream.start()

Как вы можете видеть на последнем шаге при записи фрейма данных в расположение hdfs, данные не вставляются в захватывающий каталог (мой существующийкаталог с некоторыми старыми данными, разделенными на «age»).

Я получаю

spark.sql.AnalysisException: запросы с потоковым источником должны выполняться с помощью writeStream start ()

Можете ли вы помочь, почему я не могу вставить данные в существующий каталог в папке hdfs?или есть какой-то другой способ, которым я могу сделать операцию "вставить в" на таблице улья?

В поисках решения

Ответы [ 2 ]

0 голосов
/ 25 января 2019

На всякий случай, если кто-то на самом деле попробовал код из Яцека Ласковски, он знает, что он на самом деле не компилируется в Spark 2.4.0 (проверьте мой gist , протестированный на AWS EMR 5.20.0 и vanilla Spark).Так что я думаю, это была его идея, как это должно работать в какой-то будущей версии Spark.Реальный код:

scala> import org.apache.spark.sql.Dataset
import org.apache.spark.sql.Dataset

scala> sq.writeStream.foreachBatch((batchDs: Dataset[_], batchId: Long) => batchDs.show).start
res0: org.apache.spark.sql.streaming.StreamingQuery = 
org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@5ebc0bf5
0 голосов
/ 31 декабря 2018

Spark Structured Streaming не поддерживает запись результата потокового запроса в таблицу Hive.

scala> println(spark.version)
2.4.0

val sq = spark.readStream.format("rate").load
scala> :type sq
org.apache.spark.sql.DataFrame

scala> assert(sq.isStreaming)

scala> sq.writeStream.format("hive").start
org.apache.spark.sql.AnalysisException: Hive data source can only be used with tables, you can not write files of Hive data source directly.;
  at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:246)
  ... 49 elided

Если целевая система (она же раковина ) не поддерживается, вы можете использовать операции foreach и foreachBatch (выделение шахты):

Операции foreach и foreachBatch позволяют применять произвольные операции и записывать логику на вывод потокового запроса .Они имеют несколько разные варианты использования - в то время как foreach допускает настраиваемую логику записи в каждой строке, foreachBatch допускает произвольные операции и настраиваемую логику на выходе каждой микропакета.

Я думаю foreachBatch Ваша лучшая ставка.

import org.apache.spark.sql.DataFrame
sq.writeStream.foreachBatch { case (ds: DataFrame, batchId: Long) =>
  // do whatever you want with your input DataFrame
  // incl. writing to Hive
  // I simply decided to print out the rows to the console
  ds.show
}.start

Существует также Разъем Apache Hive Warehouse , с которым я никогда не работал, но, похоже, он может помочь.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...