Spark Streaming DF Параллельная обработка - PullRequest
0 голосов
/ 15 января 2020

У меня проблема с параллельной обработкой записей в кадре данных Spark Streaming. Поток выглядит так: Kafka Topi c -> Spark Streaming (фрейм данных) ---> Внедрить BusinessRule -> Отправить результат в Kafka topi c. Текущий код обрабатывает данные в Sequence, задание Spark Streaming (DataFrame) выполняется нормально, но это отнимает много времени (из-за обработки последовательности каждой записи в foreach ()). В минуту Кафка выдает 300 сообщений, а Spark занимает 30 минут, чтобы обработать его. Бизнес-правило не сложно.

Проблема:

Я пытаюсь вызвать функцию Parallel поверх Foreach (), но как только я ее вызываю, Spark обрабатывает то же самое записывать несколько раз. Буду признателен за помощь, если вы сможете помочь в достижении параллелизма на этом этапе.

Сведения об окружении:

1) Формат данных: Json

2) Данные Сведения

Заказчик, Deptid, сорт, детали

Cust1,22, C1, maf

Cust2,23, C2, живопись

Cust3,24, C3, транспорт

Cust4,22, C4, общий

3) Версия Spark: 2.4.0 на CDH 6.3

4) Scala: 2.11 .11

5) Straming: Spark Streaming DataFreame

6) Разделение: 3

7) Брокер: 3

Вот фрагмент кода .

val df = spark.readStream.format("kafka").option("kafka.bootstrap.servers",get_details("kafka_bootstrap_server")).option("subscribe",get_details("topic_ora")).option("startingOffsets", get_details("startingOffsets")).load()

val input_data = stream_action_function_data.select($"customer",$"deptid",$"grade",$"details")

val check_User=input_data.writeStream.foreachBatch ((batchDF: DataFrame, batchId: Long) => {
batchDF.persist()

val funcn = batchDF.collect.par.foreach( col =>  {  
import org.apache.spark.sql.Row
import spark.implicits._  
val data=col
println(data)
val descri= data.getString(4)

import org.apache.spark.sql.types.{StructType, StructField, StringType,TimestampType,IntegerType};
println("Description  "+ descri)  

val sc=spark.sparkContext

val rdd = sc.parallelize(Seq(data))

val schema = StructType(Array(StructField("customer", StringType, true),StructField("deptid", StringType, true),StructField("grade", StringType, true),StructField("details", StringType, true)

val df_record=spark.createDataFrame(rdd, schema) 

if ( df_record.getString(4)== "general")
{ 
Send KafkaTopicA()//Send Schema contents to Kafka Topic A in json format                                }  
else 
{ 
Send KafkaTopicB()//Send Schema contents to Kafka Topic B in json format    
}               
})

batchDF.unpersist() 

}).option("checkpointLocation",s"$functions_checkpointLocation").outputMode("append").start()*

Входные данные:

Cust4,22, C4, общие

Cust1,22, C1, maf

Cust2,23, C2, окраска

Cust3,24, C3, транспорт

Ожидаемый выход:

Topi c A :

Cust4,22, C4, общий

Topi c B:

Cust1,22, C1, maf

Cust2,23, C2 , картина

Cust3,24, C3, транспорт

Actua Выходная мощность:

Topi c A:

Cust4,22, C4, общий

Cust4,22, C4, общий

Cust4,22, C4, общий

Topi c B:

Cust1,22, C1, maf

Cust2,23, C2, живопись

Cust3,24, C3, транспорт

Cust1,22, C1, маф

Cust2,23, C2, живопись

Cust3,24, C3, транспорт

Спасибо !!

1 Ответ

0 голосов
/ 17 января 2020

Пожалуйста, попробуйте реализовать бизнес-логику c в foreachPartitions(), чтобы добиться распределения данных между исполнителями, что в конечном итоге упростит параллелизм. collect() Операция, с другой стороны, выполняется на уровне драйвера и, следовательно, не играет никакой роли в достижении параллелизма.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...