У меня проблема с параллельной обработкой записей в кадре данных Spark Streaming. Поток выглядит так: Kafka Topi c -> Spark Streaming (фрейм данных) ---> Внедрить BusinessRule -> Отправить результат в Kafka topi c. Текущий код обрабатывает данные в Sequence, задание Spark Streaming (DataFrame) выполняется нормально, но это отнимает много времени (из-за обработки последовательности каждой записи в foreach ()). В минуту Кафка выдает 300 сообщений, а Spark занимает 30 минут, чтобы обработать его. Бизнес-правило не сложно.
Проблема:
Я пытаюсь вызвать функцию Parallel поверх Foreach (), но как только я ее вызываю, Spark обрабатывает то же самое записывать несколько раз. Буду признателен за помощь, если вы сможете помочь в достижении параллелизма на этом этапе.
Сведения об окружении:
1) Формат данных: Json
2) Данные Сведения
Заказчик, Deptid, сорт, детали
Cust1,22, C1, maf
Cust2,23, C2, живопись
Cust3,24, C3, транспорт
Cust4,22, C4, общий
3) Версия Spark: 2.4.0 на CDH 6.3
4) Scala: 2.11 .11
5) Straming: Spark Streaming DataFreame
6) Разделение: 3
7) Брокер: 3
Вот фрагмент кода .
val df = spark.readStream.format("kafka").option("kafka.bootstrap.servers",get_details("kafka_bootstrap_server")).option("subscribe",get_details("topic_ora")).option("startingOffsets", get_details("startingOffsets")).load()
val input_data = stream_action_function_data.select($"customer",$"deptid",$"grade",$"details")
val check_User=input_data.writeStream.foreachBatch ((batchDF: DataFrame, batchId: Long) => {
batchDF.persist()
val funcn = batchDF.collect.par.foreach( col => {
import org.apache.spark.sql.Row
import spark.implicits._
val data=col
println(data)
val descri= data.getString(4)
import org.apache.spark.sql.types.{StructType, StructField, StringType,TimestampType,IntegerType};
println("Description "+ descri)
val sc=spark.sparkContext
val rdd = sc.parallelize(Seq(data))
val schema = StructType(Array(StructField("customer", StringType, true),StructField("deptid", StringType, true),StructField("grade", StringType, true),StructField("details", StringType, true)
val df_record=spark.createDataFrame(rdd, schema)
if ( df_record.getString(4)== "general")
{
Send KafkaTopicA()//Send Schema contents to Kafka Topic A in json format }
else
{
Send KafkaTopicB()//Send Schema contents to Kafka Topic B in json format
}
})
batchDF.unpersist()
}).option("checkpointLocation",s"$functions_checkpointLocation").outputMode("append").start()*
Входные данные:
Cust4,22, C4, общие
Cust1,22, C1, maf
Cust2,23, C2, окраска
Cust3,24, C3, транспорт
Ожидаемый выход:
Topi c A :
Cust4,22, C4, общий
Topi c B:
Cust1,22, C1, maf
Cust2,23, C2 , картина
Cust3,24, C3, транспорт
Actua Выходная мощность:
Topi c A:
Cust4,22, C4, общий
Cust4,22, C4, общий
Cust4,22, C4, общий
Topi c B:
Cust1,22, C1, maf
Cust2,23, C2, живопись
Cust3,24, C3, транспорт
Cust1,22, C1, маф
Cust2,23, C2, живопись
Cust3,24, C3, транспорт
Спасибо !!