У меня есть задание потоковой передачи искр, которое читает из kafka (48 разделов) и записывает в дельта-таблицу. Я вижу, что приложение очень плохо масштабируется.
Я хотел бы узнать объем улучшений в этом приложении.
Интервал пакетной обработки - 1 мин.
Конфигурации отправки Spark
--conf "spark.streaming.dynamicAllocation.debug=true" \
--conf "spark.streaming.dynamicAllocation.delay.rounds=10" \
--conf "spark.streaming.dynamicAllocation.releaseRounds=5" \
--conf "spark.streaming.dynamicAllocation.minExecutors=5" \
--conf "spark.streaming.dynamicAllocation.maxExecutors=10" \
--conf "spark.streaming.dynamicAllocation.executorIdleTimeout=60s" \
--conf "spark.streaming.dynamicAllocation.scalingInterval=60" \
--conf "spark.streaming.dynamicAllocation.scalingUpRatio=1.2" \
--conf "spark.streaming.dynamicAllocation.scalingDownRatio=0.8" \
--conf "spark.streaming.dynamicAllocation.rememberBatchSize=1" \
--conf "spark.streaming.dynamicAllocation.reserveRate=0.2" \
--conf "spark.streaming.backpressure.enabled=True" \
--conf "spark.streaming.kafka.maxRatePerPartition=1000" \
--conf "spark.streaming.backpressure.pid.minRate=10000" \
--conf "spark.shuffle.service.enabled=true" \
--conf "spark.shuffle.consolidateFiles=true" \
--conf "spark.shuffle.spill=true" \
--conf "spark.sql.shuffle.partitions=64" \
--conf "spark.cleaner.ttl=1200" \
--conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" \
--conf "spark.streaming.unpersist=true" \
--conf "spark.dynamicAllocation.executorIdleTimeout=60s" \
--conf "spark.dynamicAllocation.cachedExecutorIdleTimeout=120s" \
--conf "spark.driver.memory=3G" \
--conf "spark.driver.cores=2" \
--conf "spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version=2" \
--conf "spark.executor.memoryOverhead=2048" \
--conf "spark.driver.memoryOverhead=1024" \
--conf "spark.default.parallelism=64" \
--conf "spark.eventLog.enabled=true" \
--num-executors=4 \
--executor-memory 4G \
--executor-cores 4 \
Pyspark записывает и читает в коде
df1=spark.read.json(rdd2,schema = endpoints_instance_read_schema)
df1.cache()
df2 = df1.withColumn("natdetected",expr("CASE WHEN trim(natdetected) in ('1','true','True','t') THEN true WHEN trim(natdetected) in ('0','false','False','f') THEN false ELSE cast(trim(natdetected) as boolean) END AS natdetected")) \
.withColumn("proxy",expr("CASE WHEN trim(proxy) in ('1','true','True','t') THEN true WHEN trim(proxy) in ('0','false','False','f') THEN false ELSE cast(trim(proxy) as boolean) END AS proxy")) \
.withColumn("SIPHTTPProxyTransport",expr("CASE WHEN trim(SIPHTTPProxyTransport) in ('1','true','True','t') THEN true WHEN trim(SIPHTTPProxyTransport) in ('0','false','False','f') THEN false ELSE cast(trim(SIPHTTPProxyTransport) as boolean) END AS SIPHTTPProxyTransport")) \
.withColumn('creationTime', F.from_unixtime((F.col('creationTime'))/1000, 'yyyy-MM-dd HH:mm:ss.SS').cast("timestamp")) \
.withColumn('leaveTime', F.from_unixtime((F.col('leaveTime'))/1000, 'yyyy-MM-dd HH:mm:ss.SS').cast("timestamp"))
df3 = df2.filter(df2.callguid.isNotNull())
df4 = df3.repartition(col("callguid"))
rank_spec = Window.partitionBy(col("callguid")).orderBy(desc("cdctimestamp"))
data_spec =Window.partitionBy(col("callguid")).orderBy(asc("cdctimestamp")).rowsBetween(Window.unboundedPreceding,Window.currentRow)
df5 = df4.withColumnRenamed("callguid", "callguid") \
.withColumnRenamed("creationTime", "creation_time") \
.withColumnRenamed("DisconnectReason", "disconnect_reason") \
.withColumnRenamed("leaveTime", "leave_time") \
.withColumnRenamed("localaddress", "local_address") \
.withColumnRenamed("meetingUuid", "meeting_uuid") \
.withColumnRenamed("mhaddress", "mh_address") \
.withColumnRenamed("mixaddress", "mixer_address") \
.withColumnRenamed("natdetected", "nat_detected") \
.withColumnRenamed("ProxyInfo", "proxy_info") \
.withColumnRenamed("SIPHTTPProxyTransport", "sip_proxy_transport") \
.withColumnRenamed("audiomedaddress", "audio_med_address") \
.withColumnRenamed("ReflectorAddress", "reflector_address") \
.withColumnRenamed("contentmedaddress", "content_med_address")
df6 = df5.withColumn('creation_time_var', last("creation_time", True).over(data_spec)) \
.withColumn('disconnect_reason_var', last("disconnect_reason", True).over(data_spec)) \
.withColumn('leave_time_var', last("leave_time", True).over(data_spec)) \
.withColumn('local_address_var', last("local_address", True).over(data_spec)) \
.withColumn('meeting_uuid_var', last("meeting_uuid", True).over(data_spec)) \
.withColumn('mh_address_var', last("mh_address", True).over(data_spec)) \
.withColumn('mixer_address_var', last("mixer_address", True).over(data_spec)) \
.withColumn('nat_detected_var', last("nat_detected", True).over(data_spec)) \
.withColumn('delta_partition_var', last("delta_partition", True).over(data_spec)) \
.withColumn('cdctimestamp_var', last("cdctimestamp", True).over(data_spec)) \
.withColumn('proxy_var', last("proxy", True).over(data_spec)) \
.withColumn('proxy_info_var', last("proxy_info", True).over(data_spec)) \
.withColumn('sip_proxy_transport_var', last("sip_proxy_transport", True).over(data_spec)) \
.withColumn('audio_med_address_var', last("audio_med_address", True).over(data_spec)) \
.withColumn('reflector_address_var', last("reflector_address", True).over(data_spec)) \
.withColumn('content_med_address_var', last("content_med_address", True).over(data_spec)) \
.withColumn('rank', row_number().over(rank_spec))
df7 = df6.filter(df6.rank == 1)
DeltaTable.forPath(spark, HDFS_DIR).alias("t").merge(df7.alias("s"), "s.callguid = t.callguid and s.meeting_uuid = t.meeting_uuid and s.delta_partition = t.delta_partition").whenMatchedUpdate( \
set = \
{ \
"cdctimestamp": coalesce("s.cdctimestamp_var","t.cdctimestamp") \
,"creation_time": coalesce("s.creation_time_var","t.creation_time") \
,"disconnect_reason": coalesce("s.disconnect_reason_var","t.disconnect_reason") \
,"leave_time": coalesce("s.leave_time_var","t.leave_time") \
,"local_address": coalesce("s.local_address_var","t.local_address") \
,"mh_address": coalesce("s.mh_address_var","t.mh_address") \
,"mixer_address": coalesce("s.mixer_address_var","t.mixer_address") \
,"nat_detected": coalesce("s.nat_detected_var","t.nat_detected") \
,"proxy": coalesce("s.proxy_var","t.proxy") \
,"proxy_info": coalesce("s.proxy_info_var","t.proxy_info") \
,"sip_proxy_transport": coalesce("s.sip_proxy_transport_var","t.sip_proxy_transport") \
,"audio_med_address": coalesce("s.audio_med_address_var","t.audio_med_address") \
,"reflector_address": coalesce("s.reflector_address_var","t.reflector_address") \
,"content_med_address": coalesce("s.content_med_address_var","t.content_med_address") \
}) \
.whenNotMatchedInsert(values = \
{ \
"callguid": "s.callguid" \
,"delta_partition": "s.delta_partition" \
,"cdctimestamp": "s.cdctimestamp_var" \
,"creation_time": "s.creation_time_var" \
,"disconnect_reason": "s.disconnect_reason_var" \
,"leave_time": "s.leave_time_var" \
,"local_address": "s.local_address_var" \
,"meeting_uuid": "s.meeting_uuid_var" \
,"mh_address": "s.mh_address_var" \
,"mixer_address": "s.mixer_address_var" \
,"nat_detected": "s.nat_detected_var" \
,"proxy": "s.proxy_var" \
,"proxy_info": "s.proxy_info_var" \
,"sip_proxy_transport": "s.sip_proxy_transport_var" \
,"audio_med_address": "s.audio_med_address_var" \
,"reflector_address": "s.reflector_address_var" \
,"content_med_address": "s.content_med_address_var" \
} \
).execute()
Вот скриншоты пользовательского интерфейса искры. введите описание изображения здесь введите описание изображения здесь