как вставить данные в HIVE с помощью метода foreach в потоковой передаче с искрой - PullRequest
0 голосов
/ 26 января 2019

Я пытаюсь вставить данные в таблицу HIVE, используя метод foreach.

Использую искру 2.3.0.

Вот мой код

   df_drop_window.writeStream
     .foreach(new ForeachWriter[Row]() {
       override def open(partitionId: Long, epochId: Long): Boolean = true
       override def process(value: Row): Unit = {
         println(s">> Processing ${value}")
         // how to onvert the value as dataframe ?
       }
       override def close(errorOrNull: Throwable): Unit = {
       }
     }).outputMode("update").start()

Как вы можете видеть выше, я хочу преобразовать «значение» в фрейм данных и вставить данные в таблицу HIVE, например вставить в имя таблицы (выберите * из фрейма данных). кто-то может помочь, как это сделать? Я новичок в потоковом зажигании

Я вижу только следующую доступную опцию. некоторые могут сказать, как я могу преобразовать значение: строка в данные enter image description here кадр?

Я пробовал следующее, но получаю ошибку (org.apache.spark.SparkException: задача не сериализуема)

            df.writeStream
       .foreach(new ForeachWriter[Row]() {
       override def open(partitionId: Long, epochId: Long): Boolean = true
       override def process(value: Row): Unit = {
       val rowsRdd = sc.parallelize(Seq(value))
       val df2 = spark.createDataFrame(rowsRdd, schema)
       df2.createOrReplaceTempView("testing2")
       spark.sql("insert into table are.table_name1 Partition(date) select * from testing2")
       }
       override def close(errorOrNull: Throwable): Unit = {
       }
       }).outputMode("append").start()

1 Ответ

0 голосов
/ 16 июля 2019

Сеанс Spark не сериализуем на стороне исполнителя, вам нужно транслировать сеанс Spark

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...