Набор данных Spark с колонкой добавить идентификатор раздела - PullRequest
0 голосов
/ 05 июня 2019

Я пытаюсь написать вспомогательную функцию, которая получает набор данных любого типа Dataset[_] и возвращает с одним новым столбцом "partitionId", который является идентификатором раздела, которому принадлежит один блок данных.

Например, если у меня есть набор данных ниже и по умолчанию он имеет два раздела.

+-----+------+
| colA|  colB|
+-----+------+
|   1 |     a|
|   2 |     b|
|   3 |     c|
+-----+------+

После функции должен быть приведенный ниже результат, где первые два блока данных принадлежат одному и тому же разделу.и третий принадлежит другому разделу.

+-----+------+------------+
| colA|  colB| partitionId|
+-----+------+------------+
|   1 |     a|           1|
|   2 |     b|           1|
|   3 |     c|           2|
+-----+------+------------+

Я пробовал с withColumn () и mapPartitions (), но ни один из них не работал для меня.Для withColumn () я не смог получить информацию о том, к какому разделу принадлежит блок данных, например withColumn("partitionId", {What should be here to add the partitionId?}) Для mapPartitions (), я попытался:

dataset
  .mapPartitions(iter => {
    val partitionId = UUID.randomUUID().toString
    iter.map(dataUnit => MyDataType.addPartitionId(partitionId))
  })

Но это работает только для определенного типакак Dataset[MyDataType], а не для Dataset[_]

Как добавить столбец partitionId для любого набора данных?

1 Ответ

2 голосов
/ 05 июня 2019

Есть ли причина, по которой вам нужен идентификатор раздела каждой записи? В любом случае, вы можете достичь этого:

import org.apache.spark.sql.functions.spark_partition_id
...
dataFrame.withColumn("partitionID", spark_partition_id)
...