Использование статических наборов данных внутри ForeachRDD в Java Spark Streaming для параллельной обработки RtD DStreams - PullRequest
0 голосов
/ 09 мая 2018

У нас есть DStreams, который использует сообщения JSON, используя пользовательский получатель. Это JSON-сообщения - не что иное, как пользовательский запрос в виде некоторых входных параметров.

JavaReceiverInputDStream<String> msgDStream = ssc.receiverStream(receiver);

Другое дело, у меня есть статический набор данных (предварительно загруженный), например,

Dataset<Row> loanDS = spark.read().parquet("/path")

Теперь в моем случае я хочу параллельно обрабатывать данные RtDs DStream (сообщения JSON)

msgDStream.foreachRDD(new VoidFunction<JavaRDD<String>>() {
@Override
public void call(JavaRDD<String> stringJavaRDD) throws Exception {
if(!stringJavaRDD.isEmpty()) {

System.out.println("Json string: " + requestJSON);
stringJavaRDD.foreach(new VoidFunction<String>(){
public void call(String s) throws Exception{
parseJSON(s); // externam utility to parse the JSON messages
{here i want to build aggregate clause, select clause based on the static dataset loanDS with the JSON messages}
}
});
}}});

Когда я использую loandDS внутри stringJavaRDD.foreach, у него ничего нет, потому что foreach выполняется на рабочих узлах, а loanDS присутствует в драйвере.

Как добиться этого внутри foreach, потому что я хочу обрабатывать сообщения JSON внутри DStream RDD параллельно.

...