Искровой поток + запрос таблицы кустов в каждой потоковой партии? - PullRequest
0 голосов
/ 14 марта 2019

У меня есть приложение Spark Streaming, получающее данные из Flume, и после некоторых преобразований оно записывает на Hbase.

Но чтобы сделать эти преобразования, мне нужно запросить некоторые данные из таблицы улья.Затем начинается проблема.

Я не могу использовать sqlContext или hiveContext внутри преобразований (они не сериализуются), и когда я пишу код вне преобразования, он запускается только один раз.

Как мне сделать так, чтобы этот код выполнялся в каждой потоковой партии?

  def TB_PARAMETRIZACAO_TGC(sqlContext: HiveContext): Map[String,(String,String)] = {
  val df_consulta = sqlContext.sql("SELECT TGC,TIPO,DESCRICAO FROM dl_prepago.TB_PARAMETRIZACAO_TGC")
  val resultado = df_consulta.map(x => x(Consulta_TB_PARAMETRIZACAO_TGC.TGC.id).toString
      -> (x(Consulta_TB_PARAMETRIZACAO_TGC.TIPO.id).toString, x(Consulta_TB_PARAMETRIZACAO_TGC.DESCRICAO.id).toString)).collectAsMap()
  resultado
}

1 Ответ

0 голосов
/ 14 марта 2019

Попробуйте этот очень простой подход ниже, отметив, что статические таблицы JOIN могут кэшироваться и что они не должны быть слишком большими, в противном случае эта статическая переменная должна быть KV Store LKP, скажем Hbase:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.OutputMode

object StreamJoinStatic {

case class Sales(
  transactionId: String,
  customerId:    String,
  itemId:        String,
  amountPaid:    Double)

case class Customer(customerId: String, customerName: String)

def main(args: Array[String]): Unit = {
 val sparkSession = SparkSession.builder
   .master("local") // Not recommended
   .appName("exampleStaticJoinStrStr")
   .getOrCreate()

//create stream from socket
val socketStreamDf = sparkSession.readStream
  .format("socket")
  .option("host", "localhost")
  .option("port", 50050)
  .load()

import sparkSession.implicits._
//take customer data as static df from where ever
val customerDs = sparkSession.read
  .format("csv")
  .option("header", true)
  .load("src/main/resources/customers.csv")
  .as[Customer]

import sparkSession.implicits._
val dataDf = socketStreamDf.as[String].flatMap(value ? value.split(" "))
val salesDs = dataDf
  .as[String]
  .map(value ? {
    val values = value.split(",")
    Sales(values(0), values(1), values(2), values(3).toDouble)
  })

val joinedDs = salesDs.join(customerDs, "customerId")

val query = joinedDs.writeStream.format("console").outputMode(OutputMode.Append())

query.start().awaitTermination()
}
}

Затем адаптируйтесь к вашей конкретной ситуации.

...