Spark Structured Streaming - скачок скорости ввода уменьшает длительность пакета - PullRequest
0 голосов
/ 23 сентября 2019

Я сталкиваюсь с тем, что на первый взгляд новичку Spark Streaming кажется нелогичным:

когда Spark Structured Streaming начинает обрабатывать больше данных, продолжительность пакета уменьшается

Это, вероятно, не самая точная картина, но я видел намного более четкую картину .. enter image description here

Мне, вероятно, нужно объяснить что именно является продолжительностью партии - Насколько я понимаю, он представляет количество секунд, которое требуется Spark для обработки мини-пакета потоковой передачи.

Далее мне нужно пояснить , как Spark запускает обработку мини-пакета - зависит ли это от объема данных в пакете или временных интервалов ...

РЕДАКТИРОВАТЬ
Код следующий.Здесь довольно много «тяжелых» операций (соединения, dropDuplicates, фильтрация с HOF, udfs, ...).Sink и Source являются центрами событий Azure

# [CONFIGS]
ehConfig = {
'eventhubs.startingPosition': '{"offset": "@latest", "enqueuedTime": null, 'isInclusive': true,'seqNo': -1}',
'eventhubs.maxEventsPerTrigger': 300,
'eventhubs.connectionString'='XXX'}

ehOutputConfig = {
'eventhubs.connectionString'='YYY' ,   
"checkpointLocation": "azure_blob_storage/ABCABC"
}

spark.conf.set("spark.sql.shuffle.partitions", 3)

# [FUNCS]
@udf(TimestampType())
def udf_current_timestamp():
  return datetime.now()

#-----------#
# STREAMING # 
#-----------#

# [STREAM INPUT]
df_stream_input = spark.readStream.format("eventhubs").options(**_ehConfig).load()

# [ASSEMBLY THE DATAFRAME]
df_joined = (df_stream_input
             .withColumn("InputProcessingStarted", current_timestamp().cast("long"))

             # Decode body
             .withColumn("body_decoded", from_json(col("body").cast("string"), schema=_config))

             # Join customer 
             .join(df_batch, ['CUSTOMER_ID'], 'inner')

             # Filtering
             .filter(expr('body_decoded.status NOT IN (0, 4, 32)'))
             .filter(expr('EXISTS(body_decoded.items, item -> item.ID IN (1, 2, 7))'))

             # Deduplication
             .withWatermark('enqueuedTime', '1 day') 
             .dropDuplicates(['CUSTOMER_ID', 'ItemID']) 

             # Join with lookup table
             .join(broadcast(df_lookup), ['OrderType'], 'left') 

             # UDF
             .withColumn('AssembleTimestamp', udf_current_timestamp())

             # Assemble struct 
             .withColumn('body_struct', struct('OrderType', 'OrderID', 'Price', 'StockPile'))

 # [STREAM OUTPUT]
(df_joined
 .select(to_json('body_struct').alias('body'))
 .writeStream
 .format("eventhubs")
 .options(**_ehOutputConfig)
 .trigger(processingTime='2 seconds')
 .start())

1 Ответ

0 голосов
/ 23 сентября 2019

В структурированной потоковой передаче Spark он запускает новый пакет, как только предыдущий пакет завершил обработку, если вы не укажете опцию триггера.

В более ранней версии Spark с потоковой передачей Spark мы могли указать длительность пакета, скажем,5 секунд.В этом случае он будет запускать микропакет каждые 5 секунд и обрабатывать данные, поступившие за последние 5 секунд.В случае kafka, он получит данные, которые не были зафиксированы.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...