Я пытаюсь вставить данные в таблицу HIVE, используя метод foreach.
Использую искру 2.3.0.
Вот мой код
df_drop_window.writeStream
.foreach(new ForeachWriter[Row]() {
override def open(partitionId: Long, epochId: Long): Boolean = true
override def process(value: Row): Unit = {
println(s">> Processing ${value}")
// how to onvert the value as dataframe ?
}
override def close(errorOrNull: Throwable): Unit = {
}
}).outputMode("update").start()
Как вы можете видеть выше, я хочу преобразовать «значение» в фрейм данных и вставить данные в таблицу HIVE, например вставить в имя таблицы (выберите * из фрейма данных). кто-то может помочь, как это сделать? Я новичок в потоковом зажигании
Я вижу только следующую доступную опцию. некоторые могут сказать, как я могу преобразовать значение: строка в данные кадр?
Я пробовал следующее, но получаю ошибку (org.apache.spark.SparkException: задача не сериализуема)
df.writeStream
.foreach(new ForeachWriter[Row]() {
override def open(partitionId: Long, epochId: Long): Boolean = true
override def process(value: Row): Unit = {
val rowsRdd = sc.parallelize(Seq(value))
val df2 = spark.createDataFrame(rowsRdd, schema)
df2.createOrReplaceTempView("testing2")
spark.sql("insert into table are.table_name1 Partition(date) select * from testing2")
}
override def close(errorOrNull: Throwable): Unit = {
}
}).outputMode("append").start()