Обогатите Spark Streaming данными из HDFS - PullRequest
0 голосов
/ 10 мая 2019

Я использовал потоковую обработку Spark 2.1 для обработки данных о событиях от Kafka. После агрегирования данных я хочу дополнить их справочными данными, которые хранятся в HDFS (файлы паркета).

Код драйвера указан ниже.

val ss: SparkSession = SparkSession.builder()
    .appName("app name").master("local[2]")
    .config("spark.sql.warehouse.dir", "file:///c:/tmp/spark-warehouse")
    .config("spark.sql.session.timeZone", "UTC")
    .getOrCreate()

val sc = ss.sparkContext 
val ssc = new StreamingContext(sc, 5) )

//read the data from Kafka here .....

SomeService.aggregate(kafkaInputStream).foreachRDD(rdd => {
    val df =ss.read.parquet( filePath + "/*.parquet" )
    println("Record Count in DF: " + df.count())

    rdd.foreachPartition(partition => {
        val futures = partition.map(event => {
            sentMsgsNo.add(1L)
            val eventEnriched = Enrichment(event, df)
            kafkaSinkVar.value.sendCef(eventEnriched)
        })
        // by calling get() on the futures, we make sure to wait for all
        // Producers started during this partition
        // to finish before moving on.
        futures.foreach(f => {
            if (f.get() == null) {
                failedSentMsgNo.add(1L)
            } else {
                confirmedSentMsgsNo.add(1L)
            }
        })
    })
})
def enrichment (event: SomeEventType df: DataFrame): String = {
    ...
    try {
        df.select(col("id")).first().getString(0) 
    } catch {
        case e: Exception => println("not record found") 
    }
}

В основном для каждого СДР я загружал эталонные данные в файл данных и передавал фрейм данных для обогащения каждой записи на основе некоторого идентификатора. Нет ошибки при выполнении кода, но обогащение никогда не происходит, проблема в том, что df (dataframe) является недопустимым деревом. Что не так с моим кодом / логикой?

Другой вопрос - это правильный способ сделать это? В основном я хочу читать данные только из HDFS на раздел, а не на запись.

Заранее спасибо!

...