Проблемы с производительностью PySpark - PullRequest
0 голосов
/ 13 января 2020

Я действительно новичок в spark / pyspark, и мне хотелось бы получить несколько советов.

Итак, у меня есть поток данных из запроса Hive от 2 до 100 миллионов событий (входные данные json), и мне нужно преобразовать это событие, применив к нему функцию. Выходные данные функции затем будут записаны в файл avro.

Так что здесь я немного борюсь: на самом деле мне не нужно записывать в 1 файл, а в 3 разных файла avro (они не не такая же схема). Однако входные данные поступают из того же запроса, и только функция преобразования знает, как преобразовать и дифференцировать входные данные.

, поэтому сейчас я делаю это:

records = df.rdd.map(lambda x: MyFunction(x, otherArg))  # output of MyFunction is a tuple (key, dict) the dict is formated as wanted in the avro. (however there is no schema, I doubt we can have a schema at that stage).


grouped = records.groupByKeys()
grouped.cache()
total_events = grouped.cound()  #1st count 3minutes
k1 = grouped.filter(lambda x: x[0] == 'key1').flatMap(lambda x: list(x[1]))
k2 = grouped.filter(lambda x: x[0] == 'key2').flatMap(lambda x: list(x[1]))
k3 = grouped.filter(lambda x: x[0] == 'key3').flatMap(lambda x: list(x[1]))
k1.cache()
k2.cache()
k3.cache()

nb_k1 = k1.count() #2nd counts about 15 minutes.
if nb_k1 > 0:
    k1file='abfs://p@fw.dfs.core.windows.net/k1.avro'
    avro = sparkSession.read.format('com.databricks.spark.avro').load(k1file)
    k1_df = sparkSession.createDataFrame(k1, avro.schema)
    k1_df.repartition(200).write.mode('overwrite').format('com.databricks.spark.avro').save(k1_loc)

# etc for k2 and k3

PS: у меня есть 4 группы, но только 3 из них заинтересованы в перераспределении: k1 48% k2 2% k3: 0 k4 (не используется) 50%

Итак, этот код работает, но он очень медленный, для его запуска требуется 40 минут на 40 исполнителях 3 миллиона событий.

1-й счет занимает 3 минуты (что довольно быстро, применил ли он MyFun c?, Я так думаю, поскольку он отсортирован),

2-й счет занимает 15 минут, что довольно медленно,

запись занимает от 6 до 10 минут, вероятно, из-за вставленной здесь схемы.

Если у вас есть идеи по поводу того, как оптимизировать это, я слушаю. Также, если вы спросите меня, почему я сделал этот кеш здесь или этот раздел (200) там, он просто пытается увидеть, если он изменит свои параметры,

Я могу изменить MyFun c, чтобы изменить формат вывода, если это необходимо .

ps2: да, у меня также была проблема с тем, как извлечь схему из файла avs c, поэтому я написал другой скрипт для записи пустого файла с использованием схемы:

schema = avro.schema.Parse(open('k1.avsc').read())
writer = DataFileWriter(open("k1.avro", "wb"), DatumWriter(), schema)
writer.close()

...