Структурированная потоковая передача PySpark с Kafka слишком медленно отправляет линии CSV? - PullRequest
0 голосов
/ 14 марта 2019

Это длинный вопрос, но я попытался подробно описать проблему.

У меня есть приложение Spark на основе PySpark и DStream API, получающее данные от Kafka.Однако поддержка Kafka была исключена из новейших версий PySpark для API DStream, как показано в документации Spark .Поэтому я пытаюсь изменить приложение для использования структурированной потоковой передачи.Тем не менее, я сталкиваюсь с проблемами производительности при создании фрейма данных, когда приложение получает данные от Kafka.

Данные отправляются в Kafka в виде строковых строк CSV, и приложение отвечает за получение строк и применение (уже известно) схема для обработки данных.Исходный кадр данных имеет много столбцов, связанных с kafka, а данные сообщения находятся в столбце «значение», как описано здесь .

Сценарий 1

Разделите столбцы в потоковом фрейме данных.Это желаемое решение, так как оно позволило бы мне использовать встроенные функции структурированной потоковой передачи, такие как инкрементные агрегации, водяные знаки, другие режимы вывода, такие как append и т. Д.

Пример кода:

from pyspark.sql import functions as F

def process_dataframe(df, batch_id):
  df.cache()
  count = df.count()
  print('Number of rows: {0}'.format(count))
  # continue df processing...

input_df = spark. \
    readStream. \
    format('kafka'). \
    option('kafka.bootstrap.servers', '127.0.0.1:9092'). \
    option('subscribe', 'some_topic_name'). \
    load()

streaming_df = input_df.selectExpr("CAST(value AS STRING)")
streaming_df = streaming_df.withColumn('temp', F.split('value', ','))
streaming_df = streaming_df.drop('value')

field_names = [('field' + str(idx)) for idx in range(1, 45)]
streaming_df = streaming_df.\
    select(*[F.col('temp').getItem(idx).alias('{0}'.format(column_name))
             for idx, column_name in enumerate(field_names)])
streaming_df.printSchema()
query = streaming_df. \
    writeStream. \
    trigger(processingTime='300 seconds'). \
    outputMode('update'). \
    foreachBatch(process_dataframe). \
    start()

query.awaitTermination()

Результаты производительности этой реализации не очень хорошие по сравнению с моим предыдущим приложением API DStream.Для 3,3 миллиона строк каждые 5 минут (время обработки) обработка данных занимает от 3 до 3,5 минут (только для разделения столбцов и подсчета строк).

Я попытался найти основную причину этогои я в итоге посмотрел на физический план, который генерирует оптимизатор.Кажется, что последний запрос вызывает «cast» и «split» для каждого столбца входящих данных.Вот физический план:

== Physical Plan ==
*(1) Project [split(cast(value#746 as string), ,)[0] AS field1#27, split(cast(value#746 as string), ,)[1] AS field2#28, split(cast(value#746 as string), ,)[2] AS field3#29, split(cast(value#746 as string), ,)[3] AS field4#30, split(cast(value#746 as string), ,)[4] AS field5#31, split(cast(value#746 as string), ,)[5] AS field6#32, split(cast(value#746 as string), ,)[6] AS field7#33, split(cast(value#746 as string), ,)[7] AS field8#34, split(cast(value#746 as string), ,)[8] AS field9#35, split(cast(value#746 as string), ,)[9] AS field10#36, split(cast(value#746 as string), ,)[10] AS field11#37, split(cast(value#746 as string), ,)[11] AS field12#38, split(cast(value#746 as string), ,)[12] AS field13#39, split(cast(value#746 as string), ,)[13] AS field14#40, split(cast(value#746 as string), ,)[14] AS field15#41, split(cast(value#746 as string), ,)[15] AS field16#42, split(cast(value#746 as string), ,)[16] AS field17#43, split(cast(value#746 as string), ,)[17] AS field18#44, split(cast(value#746 as string), ,)[18] AS field19#45, split(cast(value#746 as string), ,)[19] AS field20#46, split(cast(value#746 as string), ,)[20] AS field21#47, split(cast(value#746 as string), ,)[21] AS field22#48, split(cast(value#746 as string), ,)[22] AS field23#49, split(cast(value#746 as string), ,)[23] AS field24#50, ... 20 more fields]
+- *(1) Project [key#745, value#746, topic#747, partition#748, offset#749L, timestamp#750, timestampType#751]
   +- *(1) ScanV2 kafka[key#745, value#746, topic#747, partition#748, offset#749L, timestamp#750, timestampType#751] (Options: [subscribe=ip_flow_imsi_64_partitions_new_kafka_version,kafka.bootstrap.servers=127.0.0.1:90...)

Для дальнейшего изучения проблемы я реализовал вторую версию, которая описана в Сценарии 2.

Сценарий 2

Не разделяйте столбцы в потоковом фрейме данных, просто получите входящие строки в виде строк (через запятую) и разделите столбцы позже в (не потоковом) фрейме данных.

Пример кода:

from pyspark.sql import functions as F

def process_dataframe(df, batch_id):
  df = df.withColumn('temp', F.split('value', ','))
  df = df.drop('value')
  df.cache()
  df.count()
  field_names = [('field' + str(idx)) for idx in range(1, 45)]
  df = df.\
    select(*[F.col('temp').getItem(idx).alias('{0}'.format(column_name))
             for idx, column_name in enumerate(field_names)])
  df.cache()
  count = df.count()
  print('Number of rows: {0}'.format(count))
  # continue df processing...

input_df = spark. \
    readStream. \
    format('kafka'). \
    option('kafka.bootstrap.servers', '127.0.0.1:9092'). \
    option('subscribe', 'some_topic_name'). \
    load()

streaming_df = input_df.selectExpr("CAST(value AS STRING)")

query = streaming_df. \
    writeStream. \
    trigger(processingTime='300 seconds'). \
    outputMode('update'). \
    foreachBatch(process_dataframe). \
    start()

query.awaitTermination()

Хитрость в том, что я кеширую после вызова split, а затем начинаю создавать столбцы.Производительность этого подхода намного лучше.При одинаковом количестве строк на одном компьютере обработка кадра данных занимает около 31 секунды.Физический план этого подхода показывает, что разделение вызывается только один раз, поскольку после разделения кадр данных кэшируется:

== Physical Plan ==
*(2) HashAggregate(keys=[], functions=[count(1)], output=[count#413L])
+- Exchange SinglePartition
   +- *(1) HashAggregate(keys=[], functions=[partial_count(1)], output=[count#636L])
      +- InMemoryTableScan
            +- InMemoryRelation [field1#55, field2#56, field3#57, field4#58, field5#59, field6#60, field7#61, field8#62, field9#63, field10#64, field11#65, field12#66, field13#67, field14#68, field15#69, field16#70, field17#71, field18#72, field19#73, field20#74, field21#75, field22#76, field23#77, field24#78, ... 20 more fields], StorageLevel(disk, memory, deserialized, 1 replicas)
                  +- *(1) Project [temp#35[0] AS field1#55, temp#35[1] AS field2#56, temp#35[2] AS field3#57, temp#35[3] AS field4#58, temp#35[4] AS field5#59, temp#35[5] AS field6#60, temp#35[6] AS field7#61, temp#35[7] AS field8#62, temp#35[8] AS field9#63, temp#35[9] AS field10#64, temp#35[10] AS field11#65, temp#35[11] AS field12#66, temp#35[12] AS field13#67, temp#35[13] AS field14#68, temp#35[14] AS field15#69, temp#35[15] AS field16#70, temp#35[16] AS field17#71, temp#35[17] AS field18#72, temp#35[18] AS field19#73, temp#35[19] AS field20#74, temp#35[20] AS field21#75, temp#35[21] AS field22#76, temp#35[22] AS field23#77, temp#35[23] AS field24#78, ... 20 more fields]
                     +- InMemoryTableScan [temp#35]
                           +- InMemoryRelation [temp#35], StorageLevel(disk, memory, deserialized, 1 replicas)
                                 +- *(1) Project [split(value#34, ,) AS temp#35]
                                    +- *(1) SerializeFromObject [if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, value), StringType), true, false) AS value#34]
                                       +- Scan[obj#33]

Итак, мой вопрос: что я делаю неправильно в первом подходе?Правильно ли мое понимание того, что причиной этой разницы в производительности является использование многократного приведения и разделения?Если это так, поскольку я не могу вызвать кэш или сохранить его на потоковом фрейме данных , есть ли способ заставить оптимизатор вызвать кэш и разделить только один раз?

...