Я использовал потоковую обработку Spark 2.1 для обработки данных о событиях от Kafka. После агрегирования данных я хочу дополнить их справочными данными, которые хранятся в HDFS (файлы паркета).
Код драйвера указан ниже.
val ss: SparkSession = SparkSession.builder()
.appName("app name").master("local[2]")
.config("spark.sql.warehouse.dir", "file:///c:/tmp/spark-warehouse")
.config("spark.sql.session.timeZone", "UTC")
.getOrCreate()
val sc = ss.sparkContext
val ssc = new StreamingContext(sc, 5) )
//read the data from Kafka here .....
SomeService.aggregate(kafkaInputStream).foreachRDD(rdd => {
val df =ss.read.parquet( filePath + "/*.parquet" )
println("Record Count in DF: " + df.count())
rdd.foreachPartition(partition => {
val futures = partition.map(event => {
sentMsgsNo.add(1L)
val eventEnriched = Enrichment(event, df)
kafkaSinkVar.value.sendCef(eventEnriched)
})
// by calling get() on the futures, we make sure to wait for all
// Producers started during this partition
// to finish before moving on.
futures.foreach(f => {
if (f.get() == null) {
failedSentMsgNo.add(1L)
} else {
confirmedSentMsgsNo.add(1L)
}
})
})
})
def enrichment (event: SomeEventType df: DataFrame): String = {
...
try {
df.select(col("id")).first().getString(0)
} catch {
case e: Exception => println("not record found")
}
}
В основном для каждого СДР я загружал эталонные данные в файл данных и передавал фрейм данных для обогащения каждой записи на основе некоторого идентификатора. Нет ошибки при выполнении кода, но обогащение никогда не происходит, проблема в том, что df (dataframe) является недопустимым деревом. Что не так с моим кодом / логикой?
Другой вопрос - это правильный способ сделать это? В основном я хочу читать данные только из HDFS на раздел, а не на запись.
Заранее спасибо!