Я сталкиваюсь с тем, что на первый взгляд новичку Spark Streaming кажется нелогичным:
когда Spark Structured Streaming начинает обрабатывать больше данных, продолжительность пакета уменьшается
Это, вероятно, не самая точная картина, но я видел намного более четкую картину .. ![enter image description here](https://i.stack.imgur.com/koxUy.png)
Мне, вероятно, нужно объяснить что именно является продолжительностью партии - Насколько я понимаю, он представляет количество секунд, которое требуется Spark для обработки мини-пакета потоковой передачи.
Далее мне нужно пояснить , как Spark запускает обработку мини-пакета - зависит ли это от объема данных в пакете или временных интервалов ...
РЕДАКТИРОВАТЬ
Код следующий.Здесь довольно много «тяжелых» операций (соединения, dropDuplicates, фильтрация с HOF, udfs, ...).Sink и Source являются центрами событий Azure
# [CONFIGS]
ehConfig = {
'eventhubs.startingPosition': '{"offset": "@latest", "enqueuedTime": null, 'isInclusive': true,'seqNo': -1}',
'eventhubs.maxEventsPerTrigger': 300,
'eventhubs.connectionString'='XXX'}
ehOutputConfig = {
'eventhubs.connectionString'='YYY' ,
"checkpointLocation": "azure_blob_storage/ABCABC"
}
spark.conf.set("spark.sql.shuffle.partitions", 3)
# [FUNCS]
@udf(TimestampType())
def udf_current_timestamp():
return datetime.now()
#-----------#
# STREAMING #
#-----------#
# [STREAM INPUT]
df_stream_input = spark.readStream.format("eventhubs").options(**_ehConfig).load()
# [ASSEMBLY THE DATAFRAME]
df_joined = (df_stream_input
.withColumn("InputProcessingStarted", current_timestamp().cast("long"))
# Decode body
.withColumn("body_decoded", from_json(col("body").cast("string"), schema=_config))
# Join customer
.join(df_batch, ['CUSTOMER_ID'], 'inner')
# Filtering
.filter(expr('body_decoded.status NOT IN (0, 4, 32)'))
.filter(expr('EXISTS(body_decoded.items, item -> item.ID IN (1, 2, 7))'))
# Deduplication
.withWatermark('enqueuedTime', '1 day')
.dropDuplicates(['CUSTOMER_ID', 'ItemID'])
# Join with lookup table
.join(broadcast(df_lookup), ['OrderType'], 'left')
# UDF
.withColumn('AssembleTimestamp', udf_current_timestamp())
# Assemble struct
.withColumn('body_struct', struct('OrderType', 'OrderID', 'Price', 'StockPile'))
# [STREAM OUTPUT]
(df_joined
.select(to_json('body_struct').alias('body'))
.writeStream
.format("eventhubs")
.options(**_ehOutputConfig)
.trigger(processingTime='2 seconds')
.start())