У меня есть приложение Spark Streaming, получающее данные из Flume, и после некоторых преобразований оно записывает на Hbase.
Но чтобы сделать эти преобразования, мне нужно запросить некоторые данные из таблицы улья.Затем начинается проблема.
Я не могу использовать sqlContext или hiveContext внутри преобразований (они не сериализуются), и когда я пишу код вне преобразования, он запускается только один раз.
Как мне сделать так, чтобы этот код выполнялся в каждой потоковой партии?
def TB_PARAMETRIZACAO_TGC(sqlContext: HiveContext): Map[String,(String,String)] = {
val df_consulta = sqlContext.sql("SELECT TGC,TIPO,DESCRICAO FROM dl_prepago.TB_PARAMETRIZACAO_TGC")
val resultado = df_consulta.map(x => x(Consulta_TB_PARAMETRIZACAO_TGC.TGC.id).toString
-> (x(Consulta_TB_PARAMETRIZACAO_TGC.TIPO.id).toString, x(Consulta_TB_PARAMETRIZACAO_TGC.DESCRICAO.id).toString)).collectAsMap()
resultado
}