У меня есть приложение Spark Streaming, которое считывает имена таблиц Hive в записях Kafka, например, table1 .. table2 .. table3 .. и т. Д.
Я хочу выполнить структурированный запрос к таблицам ульяи поток результатов в другую тему Кафки.
У меня это так
val hqls = rdd
.filter(record => record.value() != null && record.value().trim.length > 0)
.foreach(tableName=> publishData(tableName, sparkSession, kafkaProducer))
Мой publishData
, как показано ниже
val df = sparkSession.sql("select * from " + tableName)
df.foreach { row =>
// code to write to kafka
}
Когда я выполняю это, яполучить NullPointerException
на sparkSession.sql
вызов, как показано ниже:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 13 in stage 12.0 failed 8 times, most recent failure: Lost task 13.7 in stage 12.0 (TID 838, cilhdwks0001.sys.cigna.com, executor 1): java.lang.NullPointerException
at org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:142)
at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:140)
В других сообщениях я обнаружил, что невозможно запустить val df = sparkSession.sql(hql)
внутри rdd.foreach
, но не нашел, как это сделать
Если я изменю код на collect
, как показано ниже, он работает.Почему?
val tablenames = rdd
.filter(tableName => tableName != null && tableName.trim.length > 0)
.collect()
tablenames.foreach { tablename =>
publishData(tablename, sparkSession, kafkaProducer)
}
Эффективно ли и будет ли оно правильно распределять нагрузку на кластер?