Я действительно новичок в spark / pyspark, и мне хотелось бы получить несколько советов.
Итак, у меня есть поток данных из запроса Hive от 2 до 100 миллионов событий (входные данные json), и мне нужно преобразовать это событие, применив к нему функцию. Выходные данные функции затем будут записаны в файл avro.
Так что здесь я немного борюсь: на самом деле мне не нужно записывать в 1 файл, а в 3 разных файла avro (они не не такая же схема). Однако входные данные поступают из того же запроса, и только функция преобразования знает, как преобразовать и дифференцировать входные данные.
, поэтому сейчас я делаю это:
records = df.rdd.map(lambda x: MyFunction(x, otherArg)) # output of MyFunction is a tuple (key, dict) the dict is formated as wanted in the avro. (however there is no schema, I doubt we can have a schema at that stage).
grouped = records.groupByKeys()
grouped.cache()
total_events = grouped.cound() #1st count 3minutes
k1 = grouped.filter(lambda x: x[0] == 'key1').flatMap(lambda x: list(x[1]))
k2 = grouped.filter(lambda x: x[0] == 'key2').flatMap(lambda x: list(x[1]))
k3 = grouped.filter(lambda x: x[0] == 'key3').flatMap(lambda x: list(x[1]))
k1.cache()
k2.cache()
k3.cache()
nb_k1 = k1.count() #2nd counts about 15 minutes.
if nb_k1 > 0:
k1file='abfs://p@fw.dfs.core.windows.net/k1.avro'
avro = sparkSession.read.format('com.databricks.spark.avro').load(k1file)
k1_df = sparkSession.createDataFrame(k1, avro.schema)
k1_df.repartition(200).write.mode('overwrite').format('com.databricks.spark.avro').save(k1_loc)
# etc for k2 and k3
PS: у меня есть 4 группы, но только 3 из них заинтересованы в перераспределении: k1 48% k2 2% k3: 0 k4 (не используется) 50%
Итак, этот код работает, но он очень медленный, для его запуска требуется 40 минут на 40 исполнителях 3 миллиона событий.
1-й счет занимает 3 минуты (что довольно быстро, применил ли он MyFun c?, Я так думаю, поскольку он отсортирован),
2-й счет занимает 15 минут, что довольно медленно,
запись занимает от 6 до 10 минут, вероятно, из-за вставленной здесь схемы.
Если у вас есть идеи по поводу того, как оптимизировать это, я слушаю. Также, если вы спросите меня, почему я сделал этот кеш здесь или этот раздел (200) там, он просто пытается увидеть, если он изменит свои параметры,
Я могу изменить MyFun c, чтобы изменить формат вывода, если это необходимо .
ps2: да, у меня также была проблема с тем, как извлечь схему из файла avs c, поэтому я написал другой скрипт для записи пустого файла с использованием схемы:
schema = avro.schema.Parse(open('k1.avsc').read())
writer = DataFileWriter(open("k1.avro", "wb"), DatumWriter(), schema)
writer.close()