Приложение для потоковой передачи Pyspark - для завершения каждой партии требуется много времени - PullRequest
0 голосов
/ 19 июня 2020

У меня есть задание потоковой передачи искр, которое читает из kafka (48 разделов) и записывает в дельта-таблицу. Я вижу, что приложение очень плохо масштабируется.

Я хотел бы узнать объем улучшений в этом приложении.

Интервал пакетной обработки - 1 мин.

Конфигурации отправки Spark

--conf "spark.streaming.dynamicAllocation.debug=true" \
--conf "spark.streaming.dynamicAllocation.delay.rounds=10" \
--conf "spark.streaming.dynamicAllocation.releaseRounds=5" \
--conf "spark.streaming.dynamicAllocation.minExecutors=5" \
--conf "spark.streaming.dynamicAllocation.maxExecutors=10" \
--conf "spark.streaming.dynamicAllocation.executorIdleTimeout=60s" \
--conf "spark.streaming.dynamicAllocation.scalingInterval=60" \
--conf "spark.streaming.dynamicAllocation.scalingUpRatio=1.2" \
--conf "spark.streaming.dynamicAllocation.scalingDownRatio=0.8" \
--conf "spark.streaming.dynamicAllocation.rememberBatchSize=1" \
--conf "spark.streaming.dynamicAllocation.reserveRate=0.2" \
--conf "spark.streaming.backpressure.enabled=True" \
--conf "spark.streaming.kafka.maxRatePerPartition=1000" \
--conf "spark.streaming.backpressure.pid.minRate=10000" \
--conf "spark.shuffle.service.enabled=true"  \
--conf "spark.shuffle.consolidateFiles=true"  \
--conf "spark.shuffle.spill=true"  \
--conf "spark.sql.shuffle.partitions=64" \
--conf "spark.cleaner.ttl=1200" \
--conf "spark.serializer=org.apache.spark.serializer.KryoSerializer"  \
--conf "spark.streaming.unpersist=true"  \
--conf "spark.dynamicAllocation.executorIdleTimeout=60s" \
--conf "spark.dynamicAllocation.cachedExecutorIdleTimeout=120s" \
--conf "spark.driver.memory=3G" \
--conf "spark.driver.cores=2"  \
--conf "spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version=2"  \
--conf "spark.executor.memoryOverhead=2048"  \
--conf "spark.driver.memoryOverhead=1024" \
--conf "spark.default.parallelism=64" \
--conf "spark.eventLog.enabled=true" \
--num-executors=4 \
--executor-memory 4G \
--executor-cores 4 \

Pyspark записывает и читает в коде

df1=spark.read.json(rdd2,schema = endpoints_instance_read_schema)
df1.cache()
df2 = df1.withColumn("natdetected",expr("CASE WHEN trim(natdetected) in ('1','true','True','t') THEN  true WHEN trim(natdetected) in ('0','false','False','f') THEN false ELSE cast(trim(natdetected) as boolean) END AS natdetected")) \
                    .withColumn("proxy",expr("CASE WHEN trim(proxy) in ('1','true','True','t') THEN  true WHEN trim(proxy) in ('0','false','False','f') THEN false ELSE cast(trim(proxy) as boolean)  END AS proxy")) \
                    .withColumn("SIPHTTPProxyTransport",expr("CASE WHEN trim(SIPHTTPProxyTransport) in ('1','true','True','t') THEN  true WHEN trim(SIPHTTPProxyTransport) in ('0','false','False','f') THEN false ELSE cast(trim(SIPHTTPProxyTransport) as boolean)  END AS SIPHTTPProxyTransport")) \
                    .withColumn('creationTime', F.from_unixtime((F.col('creationTime'))/1000, 'yyyy-MM-dd HH:mm:ss.SS').cast("timestamp")) \
                    .withColumn('leaveTime', F.from_unixtime((F.col('leaveTime'))/1000, 'yyyy-MM-dd HH:mm:ss.SS').cast("timestamp"))
df3 = df2.filter(df2.callguid.isNotNull())
df4 = df3.repartition(col("callguid"))

rank_spec = Window.partitionBy(col("callguid")).orderBy(desc("cdctimestamp"))
data_spec =Window.partitionBy(col("callguid")).orderBy(asc("cdctimestamp")).rowsBetween(Window.unboundedPreceding,Window.currentRow)

df5 = df4.withColumnRenamed("callguid", "callguid") \
                                .withColumnRenamed("creationTime", "creation_time") \
                                .withColumnRenamed("DisconnectReason", "disconnect_reason") \
                                .withColumnRenamed("leaveTime", "leave_time") \
                                .withColumnRenamed("localaddress", "local_address") \
                                .withColumnRenamed("meetingUuid", "meeting_uuid") \
                                .withColumnRenamed("mhaddress", "mh_address") \
                                .withColumnRenamed("mixaddress", "mixer_address") \
                                .withColumnRenamed("natdetected", "nat_detected") \
                                .withColumnRenamed("ProxyInfo", "proxy_info") \
                                .withColumnRenamed("SIPHTTPProxyTransport", "sip_proxy_transport") \
                                .withColumnRenamed("audiomedaddress", "audio_med_address") \
                                .withColumnRenamed("ReflectorAddress", "reflector_address") \
                                .withColumnRenamed("contentmedaddress", "content_med_address")

df6 = df5.withColumn('creation_time_var', last("creation_time", True).over(data_spec)) \
                                .withColumn('disconnect_reason_var', last("disconnect_reason", True).over(data_spec)) \
                                .withColumn('leave_time_var', last("leave_time", True).over(data_spec)) \
                                .withColumn('local_address_var', last("local_address", True).over(data_spec)) \
                                .withColumn('meeting_uuid_var', last("meeting_uuid", True).over(data_spec)) \
                                .withColumn('mh_address_var', last("mh_address", True).over(data_spec)) \
                                .withColumn('mixer_address_var', last("mixer_address", True).over(data_spec)) \
                                .withColumn('nat_detected_var', last("nat_detected", True).over(data_spec)) \
                                .withColumn('delta_partition_var', last("delta_partition", True).over(data_spec)) \
                                .withColumn('cdctimestamp_var', last("cdctimestamp", True).over(data_spec)) \
                                .withColumn('proxy_var', last("proxy", True).over(data_spec)) \
                                .withColumn('proxy_info_var', last("proxy_info", True).over(data_spec)) \
                                .withColumn('sip_proxy_transport_var', last("sip_proxy_transport", True).over(data_spec)) \
                                .withColumn('audio_med_address_var', last("audio_med_address", True).over(data_spec)) \
                                .withColumn('reflector_address_var', last("reflector_address", True).over(data_spec)) \
                                .withColumn('content_med_address_var', last("content_med_address", True).over(data_spec)) \
                                .withColumn('rank', row_number().over(rank_spec))

df7 = df6.filter(df6.rank == 1)
DeltaTable.forPath(spark, HDFS_DIR).alias("t").merge(df7.alias("s"), "s.callguid = t.callguid and s.meeting_uuid = t.meeting_uuid and s.delta_partition = t.delta_partition").whenMatchedUpdate( \
                                set = \
                                    { \
                                        "cdctimestamp": coalesce("s.cdctimestamp_var","t.cdctimestamp") \
                                        ,"creation_time": coalesce("s.creation_time_var","t.creation_time") \
                                        ,"disconnect_reason": coalesce("s.disconnect_reason_var","t.disconnect_reason") \
                                        ,"leave_time": coalesce("s.leave_time_var","t.leave_time") \
                                        ,"local_address": coalesce("s.local_address_var","t.local_address") \
                                        ,"mh_address": coalesce("s.mh_address_var","t.mh_address") \
                                        ,"mixer_address": coalesce("s.mixer_address_var","t.mixer_address") \
                                        ,"nat_detected": coalesce("s.nat_detected_var","t.nat_detected") \
                                        ,"proxy": coalesce("s.proxy_var","t.proxy") \
                                        ,"proxy_info": coalesce("s.proxy_info_var","t.proxy_info") \
                                        ,"sip_proxy_transport": coalesce("s.sip_proxy_transport_var","t.sip_proxy_transport") \
                                        ,"audio_med_address": coalesce("s.audio_med_address_var","t.audio_med_address") \
                                        ,"reflector_address": coalesce("s.reflector_address_var","t.reflector_address") \
                                        ,"content_med_address": coalesce("s.content_med_address_var","t.content_med_address") \
                                        }) \
                                        .whenNotMatchedInsert(values = \
                                        { \
                                        "callguid": "s.callguid" \
                                        ,"delta_partition": "s.delta_partition" \
                                        ,"cdctimestamp": "s.cdctimestamp_var" \
                                        ,"creation_time": "s.creation_time_var" \
                                        ,"disconnect_reason": "s.disconnect_reason_var" \
                                        ,"leave_time": "s.leave_time_var" \
                                        ,"local_address": "s.local_address_var" \
                                        ,"meeting_uuid": "s.meeting_uuid_var" \
                                        ,"mh_address": "s.mh_address_var" \
                                        ,"mixer_address": "s.mixer_address_var" \
                                        ,"nat_detected": "s.nat_detected_var" \
                                        ,"proxy": "s.proxy_var" \
                                        ,"proxy_info": "s.proxy_info_var" \
                                        ,"sip_proxy_transport": "s.sip_proxy_transport_var" \
                                        ,"audio_med_address": "s.audio_med_address_var" \
                                        ,"reflector_address": "s.reflector_address_var" \
                                        ,"content_med_address": "s.content_med_address_var" \
                                        } \
                                        ).execute()

Вот скриншоты пользовательского интерфейса искры. введите описание изображения здесь введите описание изображения здесь

...