Как выполнить запросы к таблицам Hive, отправленным через Kafka, в приложении Spark Streaming? - PullRequest
0 голосов
/ 25 февраля 2019

У меня есть приложение Spark Streaming, которое считывает имена таблиц Hive в записях Kafka, например, table1 .. table2 .. table3 .. и т. Д.

Я хочу выполнить структурированный запрос к таблицам ульяи поток результатов в другую тему Кафки.

У меня это так

val hqls = rdd
    .filter(record => record.value() != null && record.value().trim.length > 0)
    .foreach(tableName=> publishData(tableName, sparkSession, kafkaProducer))

Мой publishData, как показано ниже

val df = sparkSession.sql("select * from " + tableName)
df.foreach { row =>
  // code to write to kafka
}

Когда я выполняю это, яполучить NullPointerException на sparkSession.sql вызов, как показано ниже:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 13 in stage 12.0 failed 8 times, most recent failure: Lost task 13.7 in stage 12.0 (TID 838, cilhdwks0001.sys.cigna.com, executor 1): java.lang.NullPointerException
    at org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:142)
    at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:140)

В других сообщениях я обнаружил, что невозможно запустить val df = sparkSession.sql(hql) внутри rdd.foreach, но не нашел, как это сделать

Если я изменю код на collect, как показано ниже, он работает.Почему?

val tablenames = rdd
  .filter(tableName => tableName != null && tableName.trim.length > 0)
  .collect() 
tablenames.foreach { tablename => 
  publishData(tablename, sparkSession, kafkaProducer)
}

Эффективно ли и будет ли оно правильно распределять нагрузку на кластер?

1 Ответ

0 голосов
/ 26 февраля 2019

У меня есть набор имен таблиц кустов, которые приходят через kafka в мое приложение Spark Streaming.Мне нужно выполнить запрос к каждой из таблиц

. Как только вы начинаете потоковую передачу таблиц Hive на Kafka, все виды Spark (независимо от того, используете ли вы Spark SQL или Spark Streaming или Spark Structured Streaming) являются записямичто все вместе (в одной партии) может или не может соответствовать таблицам Hive.За один цикл обработки можно увидеть только половину таблицы или полтора.Это не предсказуемо.

ИМХО Вам нужно отправить запись маркера, чтобы Spark мог отфильтровать все записи, принадлежащие одной таблице Hive, в набор данных.Это может сработать, но я сомневаюсь, что это наиболее желательное решение.

Короче говоря, в Spark вы работаете с одной записью Kafka (которая берется из таблицы Hive), и если вы не отправите дополнительные метаданные, Spark не узнаетнаходится ли вся таблица Hive в наборе данных.

.foreach(tableName=> publishData

Все, что вы делаете в publishData, происходит с исполнителями Spark, где SparkContext и SparkSession не доступныВы просто не можете использовать их на исполнителях (согласно дизайну Spark) и, следовательно, "NullPointerException at sparkSession.sql" Это ожидается.

Если я изменю его на (...).collect() (...) это работает, эффективно ли и будет ли оно правильно распределять нагрузку на кластер?

Любое collect в приложении Spark нарушает предпосылку Spark для распределения большихнабор данных между узлами в кластере, так что вы можете обрабатывать больше, чем может обрабатывать один компьютер.Любой collect сводит все распределенные данные в одну JVM драйвера и может привести к OutOfMemoryErrors (что могло быть причиной того, что вы рассматривали Apache Spark для обработки больших наборов данных в первую очередь).

...