Свести несколько столбцов данных в один - PullRequest
0 голосов
/ 13 мая 2019

Каждый день я получаю файл с ~ 2k столбцами.Есть 900 столбцов "отношения".Например:

    data.id | name | AGE |data.rel.1 | data.rel.2 | data.rel.1.type | data.rel.2.type
    12      | JOE  | 25  | ASDF      | QWER       | order           | order
    23      | TIM  | 20  | AAAA      | SSSS       | product         | product
    34      | BRAD | 32  | XXXX      | null       | order           | null
    11      | MATT | 23  | ASDF      | QWER       | agreement       | agreement

Мне нужно сгладить данные и создать фрейм данных "id - rel - rel type", который будет содержать только data.id, data.rel и data.rel.type

    data.id | data.rel | data.rel.type
    12      | ASDF     | order   
    12      | QWER     | order        
    23      | AAAA     | product    
    23      | SSSS     | product     
    34      | XXXX     | order   
    11      | ASDF     | agreement   
    11      | QWER     | agreement

Похоже, что это решение работает с одним столбцом, однако я не уверен, как включить столбец rel.type в ту же логику:

   pattern = '/*rel/*'     

   def explode(row,pattern):
       for c in row.asDict():
           if re.search(pattern, c):
               yield (row['data_id'],row[c])


    df.rdd.flatMap(lambda r:explode(r,pattern))
             .toDF(['data_id','data_rel'])
             .filter(F.col('data_rel').isNotNull())
             .show()

Есть идеи?

Ответы [ 2 ]

3 голосов
/ 13 мая 2019

Вот решение

import pyspark.sql.functions as F

df = spark.createDataFrame(
    [(12, 'JOE', 25, 'ASDF', 'QWER', 'ZXCV'),
    (23, 'TIM', 20, 'AAAA', 'SSSS', 'DDDD'),
    (34, 'BRAD', 32, 'XXXX', None, None),
    (11, 'MATT', 23, 'ASDF', 'QWER', None)],
    ['data_id','name','AGE','data_rel_1','data_rel_2','data_rel_3']
)

# Create an array of the columns you want
cols = F.array(
    *[F.col(c).alias(c) for c in ['data_rel_1', 'data_rel_2', 'data_rel_3']]
)

df.withColumn(
    "data_rel", cols
).select(
    'data_id',F.explode('data_rel').alias('data_rel')
).filter(
    F.col('data_rel').isNotNull()
).show()

, что приводит к:

+-------+--------+
|data_id|data_rel|
+-------+--------+
|     12|    ASDF|
|     12|    QWER|
|     12|    ZXCV|
|     23|    AAAA|
|     23|    SSSS|
|     23|    DDDD|
|     34|    XXXX|
|     11|    ASDF|
|     11|    QWER|
+-------+--------+

РЕДАКТИРОВАТЬ Другое решение, использующее rdd и также взорвать, может принять шаблон в качестве параметра (Это может не привести к исключениям с большим количеством столбцов)

import pyspark.sql.functions as F

#takes pattern, and explodes those cols which match pattern
def explode(row,pattern):
    import re
    for c in row.asDict():
        if re.search(pattern, c):
            yield (row['data_id'],row[c])

df = spark.createDataFrame(
    [(12, 'JOE', 25, 'ASDF', 'QWER', 'ZXCV'),
    (23, 'TIM', 20, 'AAAA', 'SSSS', 'DDDD'),
    (34, 'BRAD', 32, 'XXXX', None, None),
    (11, 'MATT', 23, 'ASDF', 'QWER', None)],['data_id','name','AGE','data_rel_1','data_rel_2','data_rel_3']
)
pattern = '/*rel/*'
df.rdd.flatMap(
    lambda r:explode(r,pattern)
).toDF(
    ['data_id','data_rel']
).filter(
    F.col('data_rel').isNotNull()
).show()
1 голос
/ 13 мая 2019

Не знаю Python, я не смог бы ответить здесь .. написал в scala. Вы можете попробовать перевести на Python. - сначала выберите data.id и data.rel.1 как df1 аналогично data.id и data.rel.2 как df2 и data.id и data.rel.3 как df3

Теперь у вас есть 3 кадра данных, затем объедините их, чтобы получить результат выше

import org.apache.spark.sql.{ SparkSession}

/**
  * Created by Ram Ghadiyaram
  */
object DFUnionExample {

  def main(args: Array[String]) {

    val sparkSession = SparkSession.builder.
      master("local")
      .appName("DFUnionExample")
      .getOrCreate()

    import sparkSession.implicits._

    val basedf = Seq((12, "JOE", 25, "ASDF", "QWER", "ZXCV"),
      (23, "TIM", 20, "AAAA", "SSSS", "DDDD"),
      (34, "BRAD", 32, "XXXX", null, null),
      (11, "MATT", 23, "ASDF", "QWER", null)
    ).toDF("data.id", "name", "AGE", "data.rel.one", "data.rel.two", "data.rel.three")
    basedf.show
    import org.apache.spark.sql.functions._
     val df1 =   basedf.select(col("`data.id`"),col("`data.rel.one`"))
        val df2 =basedf.select(col("`data.id`"),col("`data.rel.two`"))
        val df3 =   basedf.select(col("`data.id`"),col("`data.rel.three`"))
        df1.union(df2).union(df3)
          .select(col("`data.id`"),col("`data.rel.one`").as("data.rel"))
          .filter(col("`data.rel`").isNotNull)
          .sort(col("`data.id`")).show
  }
}

Результат:

+-------+----+---+------------+------------+--------------+
|data.id|name|AGE|data.rel.one|data.rel.two|data.rel.three|
+-------+----+---+------------+------------+--------------+
|     12| JOE| 25|        ASDF|        QWER|          ZXCV|
|     23| TIM| 20|        AAAA|        SSSS|          DDDD|
|     34|BRAD| 32|        XXXX|        null|          null|
|     11|MATT| 23|        ASDF|        QWER|          null|
+-------+----+---+------------+------------+--------------+

+-------+--------+
|data.id|data.rel|
+-------+--------+
|     11|    QWER|
|     11|    ASDF|
|     12|    ASDF|
|     12|    QWER|
|     12|    ZXCV|
|     23|    AAAA|
|     23|    DDDD|
|     23|    SSSS|
|     34|    XXXX|
+-------+--------+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...