Есть какие-нибудь короли искры?
Вариант использования: у меня есть кадр данных в 1 миллион строк, я хочу обрабатывать 5 строк в json за раз без потери параллелизма.
Пример кадра данных (df):
+-------------+---------+
| col_a | col_b |
+-------------+---------+
| row1a | row1b |
| row2a | row2b |
| row3a | row3b |
| row4a | row4b |
| row5a | row5b |
| row6a | row6b |
| row7a | row7b |
| .. | .. |
+-------------+---------+
Текущий рабочий раствор
zipwithindex
row_id_df = df.rdd.map(lambda x: json.dumps(x.asDict())).zipWithIndex().toDF(["item", "id"])
Над строкой преобразует кадр данных в
dataframe (row_id_df):
+--------------------------------------+--------+
| item | id |
+--------------------------------------+--------+
| {"col_a": "row1a", "col_b": "row1b"} | 0 |
| {"col_a": "row2a", "col_b": "row2b"} | 1 |
| {"col_a": "row3a", "col_b": "row3b"} | 2 |
| {"col_a": "row4a", "col_b": "row4b"} | 3 |
| {"col_a": "row5a", "col_b": "row5b"} | 4 |
| {"col_a": "row6a", "col_b": "row6b"} | 5 |
| {"col_a": "row7a", "col_b": "row7b"} | 6 |
| .. | .. |
+--------------------------------------+--------+
На данный момент у меня есть все строки с идентификатором, теперь я выполняю групповую операцию, используя выражение, которое группирует каждые 5 элементов в группу.
splitBy = (floor(col("id") / lit(5)) * lit(5)) \
.cast(IntegerType()).alias("id")
row_id_df.groupBy(splitBy) \
.agg(collect_list(col("item"))) \
.select(col("collect_list(item)").alias("items")) \
.rdd.foreach(process_each_5)
process_each_5(data):
print(len(data.items)) // 5
Я смог это сделать, и прекрасно работает. Но я чувствую, что есть другой способ сделать это гораздо проще.
И, наконец, от к кадру данных, который мне нужно завершить выше объяснение:
С:
+-------------+---------+
| col_a | col_b |
+-------------+---------+
| row1a | row1b |
| row2a | row2b |
| row3a | row3b |
| row4a | row4b |
| row5a | row5b |
| row6a | row6b |
| row7a | row7b |
| .. | .. |
+-------------+---------+
Кому:
+-------------------------------------------+
| items |
+-------------------------------------------+
| [{"col_a": "row1a", "col_b": "row1b"}, |
| {"col_a": "row2a", "col_b": "row2b"}, |
| {"col_a": "row3a", "col_b": "row3b"}, |
| {"col_a": "row4a", "col_b": "row4b"}, |
| {"col_a": "row5a", "col_b": "row5b"}] |
| [{"col_a": "row6a", "col_b": "row6b"}, |
| {"col_a": "row7a", "col_b": "row7b"},...]|
| .. |
+-------------------------------------------+
PS: я не хочу использовать df.collect ()