Как обработать n строк в фрейме данных Pyspark или RDD - PullRequest
0 голосов
/ 29 июня 2018

Есть какие-нибудь короли искры?

Вариант использования: у меня есть кадр данных в 1 миллион строк, я хочу обрабатывать 5 строк в json за раз без потери параллелизма.

Пример кадра данных (df):

+-------------+---------+
|    col_a    |  col_b  |
+-------------+---------+
| row1a       | row1b   |
| row2a       | row2b   |
| row3a       | row3b   |
| row4a       | row4b   |
| row5a       | row5b   |
| row6a       | row6b   |
| row7a       | row7b   |
| ..          | ..      |
+-------------+---------+

Текущий рабочий раствор

zipwithindex

row_id_df = df.rdd.map(lambda x: json.dumps(x.asDict())).zipWithIndex().toDF(["item", "id"])

Над строкой преобразует кадр данных в

dataframe (row_id_df):

+--------------------------------------+--------+
|                    item              |   id   |
+--------------------------------------+--------+
| {"col_a": "row1a", "col_b": "row1b"} |   0    |
| {"col_a": "row2a", "col_b": "row2b"} |   1    |
| {"col_a": "row3a", "col_b": "row3b"} |   2    |
| {"col_a": "row4a", "col_b": "row4b"} |   3    |
| {"col_a": "row5a", "col_b": "row5b"} |   4    |
| {"col_a": "row6a", "col_b": "row6b"} |   5    |
| {"col_a": "row7a", "col_b": "row7b"} |   6    |
| ..                                   | ..     |
+--------------------------------------+--------+

На данный момент у меня есть все строки с идентификатором, теперь я выполняю групповую операцию, используя выражение, которое группирует каждые 5 элементов в группу.

splitBy = (floor(col("id") / lit(5)) * lit(5)) \
                   .cast(IntegerType()).alias("id")

row_id_df.groupBy(splitBy) \
            .agg(collect_list(col("item"))) \
            .select(col("collect_list(item)").alias("items")) \
            .rdd.foreach(process_each_5)

process_each_5(data):
    print(len(data.items)) // 5

Я смог это сделать, и прекрасно работает. Но я чувствую, что есть другой способ сделать это гораздо проще.

И, наконец, от к кадру данных, который мне нужно завершить выше объяснение:

С:

+-------------+---------+
|    col_a    |  col_b  |
+-------------+---------+
| row1a       | row1b   |
| row2a       | row2b   |
| row3a       | row3b   |
| row4a       | row4b   |
| row5a       | row5b   |
| row6a       | row6b   |
| row7a       | row7b   |
| ..          | ..      |
+-------------+---------+

Кому:

+-------------------------------------------+
|                    items                  |
+-------------------------------------------+
| [{"col_a": "row1a", "col_b": "row1b"},    |
|  {"col_a": "row2a", "col_b": "row2b"},    |
|  {"col_a": "row3a", "col_b": "row3b"},    |
|  {"col_a": "row4a", "col_b": "row4b"},    |
|  {"col_a": "row5a", "col_b": "row5b"}]    |
| [{"col_a": "row6a", "col_b": "row6b"},    |
|  {"col_a": "row7a", "col_b": "row7b"},...]|
| ..                                        |
+-------------------------------------------+

PS: я не хочу использовать df.collect ()

...