Разделяйте хорошие и плохие строки на разных фреймах в PySpark - PullRequest
0 голосов
/ 25 марта 2020

Я работаю над заданием PySpark, которое анализирует различные типы файлов журнала доступа.

Мне нужно читать текстовые строки из файлов, которые могут соответствовать или не соответствовать заданному регулярному выражению.

Хорошее соответствует go для данных, которые будут записаны как паркет. Плохое совпадение go с другим фреймом данных, который будет записан в виде обычного текста (как исходные файлы).

Чтобы добиться этого, я добавляю столбец «bad_row» во время построения RDD в функции карты линий.

В этом столбце после выделения оставьте остальные столбцы со значениями None. Аналогично, когда строка хорошо совпадает, я оставляю этот столбец со значением None.

Таким образом, я могу разделить фрейм данных на хороший и плохой.

Но с этим решением, когда я go записываю их на диск, это очень дорого, потому что, когда я пишу второй, Spark приходится на другом этапе переоценивать весь исходный фрейм данных.

Производительность очень плохо, если у меня очень большие исходные данные.

Я попытался добавить постоянный слой, чтобы избежать бесполезных переоценок, и я также попытался выполнить "маленькую" проверку на неверный фрейм данных перед тем, как записать его.

Вот пример моего кода:

# this add the "bad_row" column in case of bad matches
def custom_parse(line, regex, fields):
    match = re.search(regex, line)
    if match is None:
        row = {}
        for field in fields:
            row[field] = None
        row["_bad_line"] = line
        return Row(**row)
    else:
        row = {}
        for i, field in enumerate(fields):
            row[field] = match.group(i+1)
        row["_bad_line"] = None
        return Row(**row)

rdd = spark.sparkContext.textFile("s3a://source-bucket/prefix/")
rdd = rdd.repartition(numPartitions=(120))
rdd = rdd.map(lambda line: custom_parse(line, regex,, schema_fields))
df = spark.createDataFrame(rdd, schema=custom_schema)
df.cache()

bad_df = df.filter(df["_bad_line"].isNotNull())
bad_df = bad_df.select("_bad_line").dropna(how="all")
good_df = df.filter(df["_bad_line"].isNull())
good_df = good_df.drop("_bad_line").dropna(how="all")

good_df.write.mode('overwrite').parquet("s3://dest-bucket/good-prefix/")
if len(bad_df.head(1)) != 0:
    bad_df.write.mode('overwrite').text("s3://dest-bucket/bad-prefix/")
df.unpersist()

Есть ли у вас какие-либо предложения для достижения этого более эффективным способом?

...